@Produces(MediaType.SERVER_SENT_EVENTS)
public void getSSE(@PathParam("streamName") final String streamName, @Context final SseEventSink sink,
@Context final Sse sse) {
- final var listener = listenersBroker.listenerFor(streamName);
+ final var listener = listenersBroker.getStream(streamName);
if (listener == null) {
LOG.debug("Listener for stream with name {} was not found.", streamName);
throw new WebApplicationException("No such stream: " + streamName, Status.NOT_FOUND);
} else if (CreateNotificationStream.QNAME.equals(type)) {
return listenersBroker.createNotificationStream(input, localDatabind.modelContext());
} else if (SubscribeDeviceNotification.QNAME.equals(type)) {
- return listenersBroker.createDeviceNotificationListener(input,
+ return listenersBroker.createDeviceNotificationStream(input,
listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
}
}
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractNotificationListenerAdaptor extends AbstractStream<DOMNotification>
implements DOMNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class);
- private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY =
- JSONNotificationFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
AbstractNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType,
final ListenersBroker listenersBroker) {
private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
return switch (outputType) {
- case JSON -> JSON_FORMATTER_FACTORY;
+ case JSON -> JSONNotificationFormatter.FACTORY;
case XML -> XMLNotificationFormatter.FACTORY;
};
}
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import javax.xml.xpath.XPathExpressionException;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
/**
* Base superclass for all stream types.
*/
-abstract class AbstractStream<T> implements AutoCloseable {
+abstract class AbstractStream<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractStream.class);
private final EventFormatterFactory<T> formatterFactory;
return !subscribers.isEmpty();
}
- /**
- * Return all subscribers of listener.
- *
- * @return Set of all subscribers.
- */
- final synchronized Set<StreamSessionHandler> getSubscribers() {
- return new HashSet<>(subscribers);
- }
-
- @Override
- public final synchronized void close() throws InterruptedException, ExecutionException {
- if (registration != null) {
- registration.close();
- registration = null;
- }
- deleteDataInDS().get();
- subscribers.clear();
- }
-
/**
* Registers {@link StreamSessionHandler} subscriber.
*
}
/**
- * Removes {@link StreamSessionHandler} subscriber.
+ * Removes {@link StreamSessionHandler} subscriber. If this was the last subscriber also shut down this stream and
+ * initiate its removal from global state.
*
* @param subscriber SSE or WS session handler.
*/
synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
subscribers.remove(subscriber);
LOG.debug("Subscriber {} is removed", subscriber);
- if (!hasSubscribers()) {
- listenersBroker.removeAndCloseListener(this);
+ if (subscribers.isEmpty()) {
+ closeRegistration();
+ listenersBroker.removeStream(dataBroker, this);
+ }
+ }
+
+ /**
+ * Signal the end-of-stream condition to subscribers, shut down this stream and initiate its removal from global
+ * state.
+ */
+ final synchronized void endOfStream() {
+ closeRegistration();
+
+ final var it = subscribers.iterator();
+ while (it.hasNext()) {
+ it.next().endOfStream();
+ it.remove();
+ }
+
+ listenersBroker.removeStream(dataBroker, this);
+ }
+
+ @Holding("this")
+ private void closeRegistration() {
+ if (registration != null) {
+ registration.close();
+ registration = null;
}
}
*/
@SuppressWarnings("checkstyle:hiddenField")
// FIXME: this is pure lifecycle nightmare just because ...
- public void setCloseVars(final DOMDataBroker dataBroker, final DatabindProvider databindProvider) {
+ public synchronized void setCloseVars(final DOMDataBroker dataBroker, final DatabindProvider databindProvider) {
this.dataBroker = dataBroker;
this.databindProvider = databindProvider;
}
- /**
- * Delete data in DS.
- */
- // FIXME: here we touch datastore, which probably should be done by whoever instantiated us or created the resource,
- // or they should be giving us the transaction
- private ListenableFuture<?> deleteDataInDS() {
- final var wTx = dataBroker.newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(streamName));
- return wTx.commit();
- }
-
@Override
public final String toString() {
return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link DeviceNotificationListenerAdaptor} is responsible to track events on notifications.
*/
public final class DeviceNotificationListenerAdaptor extends AbstractNotificationListenerAdaptor
implements DOMMountPointListener {
- private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationListenerAdaptor.class);
-
private final @NonNull EffectiveModelContext effectiveModel;
private final @NonNull DOMMountPointService mountPointService;
private final @NonNull YangInstanceIdentifier instanceIdentifier;
@Override
public void onMountPointRemoved(final YangInstanceIdentifier path) {
if (instanceIdentifier.equals(path)) {
- getSubscribers().forEach(subscriber -> {
- if (subscriber.isConnected()) {
- subscriber.sendDataMessage("Device disconnected");
- }
- if (subscriber instanceof SSESessionHandler sseSessionHandler) {
- try {
- sseSessionHandler.close();
- } catch (IllegalStateException e) {
- LOG.warn("Ignoring exception while closing sse session");
- }
- }
- });
- listenersBroker.removeAndCloseDeviceNotificationListener(this);
resetListenerRegistration();
+ endOfStream();
}
}
}
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static java.util.Objects.requireNonNull;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
final class JSONNotificationFormatter extends NotificationFormatter {
- private static final @NonNull String NOTIFICATION_NAME;
+ private static final @NonNull String NOTIFICATION_NAME =
+ $YangModuleInfoImpl.getInstance().getName().getLocalName() + ":notification";
+ @VisibleForTesting
+ static final JSONNotificationFormatter EMPTY = new JSONNotificationFormatter(TextParameters.EMPTY);
- static {
- final var ietfRestconfName = $YangModuleInfoImpl.getInstance().getName();
- NOTIFICATION_NAME = ietfRestconfName.getLocalName() + ":notification";
- }
+ static final NotificationFormatterFactory FACTORY = new NotificationFormatterFactory(EMPTY) {
+ @Override
+ JSONNotificationFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
+ throws XPathExpressionException {
+ return new JSONNotificationFormatter(textParams, xpathFilter);
+ }
- private final JSONCodecFactorySupplier codecSupplier;
+ @Override
+ JSONNotificationFormatter newFormatter(final TextParameters textParams) {
+ return new JSONNotificationFormatter(textParams);
+ }
+ };
- private JSONNotificationFormatter(final TextParameters textParams, final JSONCodecFactorySupplier codecSupplier) {
+ private JSONNotificationFormatter(final TextParameters textParams) {
super(textParams);
- this.codecSupplier = requireNonNull(codecSupplier);
}
- private JSONNotificationFormatter(final TextParameters textParams, final String xpathFilter,
- final JSONCodecFactorySupplier codecSupplier) throws XPathExpressionException {
+ private JSONNotificationFormatter(final TextParameters textParams, final String xpathFilter)
+ throws XPathExpressionException {
super(textParams, xpathFilter);
- this.codecSupplier = requireNonNull(codecSupplier);
- }
-
- static NotificationFormatterFactory createFactory(final JSONCodecFactorySupplier codecSupplier) {
- final var empty = new JSONNotificationFormatter(TextParameters.EMPTY, codecSupplier);
- return new NotificationFormatterFactory(empty) {
- @Override
- JSONNotificationFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
- throws XPathExpressionException {
- return new JSONNotificationFormatter(textParams, xpathFilter, codecSupplier);
- }
-
- @Override
- JSONNotificationFormatter newFormatter(final TextParameters textParams) {
- return new JSONNotificationFormatter(textParams, codecSupplier);
- }
- };
}
@Override
.name(NOTIFICATION_NAME).beginObject()
.name("event-time").value(toRFC3339(now));
writeNotificationBody(JSONNormalizedNodeStreamWriter.createNestedWriter(
- codecSupplier.getShared(schemaContext), input.getType(), null, jsonWriter), input.getBody());
+ JSONCodecFactorySupplier.RFC7951.getShared(schemaContext), input.getType(), null, jsonWriter),
+ input.getBody());
jsonWriter.endObject().endObject();
}
return writer.toString();
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.MoreExecutors;
import java.net.URI;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.StampedLock;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.ws.rs.core.UriInfo;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMMountPoint;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.common.errors.RestconfFuture;
-import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.QNameModule;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.slf4j.Logger;
}
}
+ /**
+ * Factory interface for creating instances of {@link AbstractStream}.
+ *
+ * @param <T> {@link AbstractStream} type
+ */
+ @FunctionalInterface
+ public interface StreamFactory<T extends AbstractStream<?>> {
+ /**
+ * Create a stream with the supplied name.
+ *
+ * @param name Stream name
+ * @return An {@link AbstractStream}
+ */
+ @NonNull T createStream(@NonNull String name);
+ }
+
/**
* Holder of all handlers for notifications.
*/
private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
- // Prefixes for stream names
- private static final String DATA_SUBSCRIPTION = "data-change-event-subscription";
- private static final String NOTIFICATION_STREAM = "notification-stream";
- private static final String DEVICE_NOTIFICATION_STREAM = "device-notification-stream";
-
- private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
-
- private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
private static final QName DATASTORE_QNAME =
- QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
- private static final QName SCOPE_QNAME =
- QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
+ QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern();
private static final QName OUTPUT_TYPE_QNAME =
- QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
+ QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern();
private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
- QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
+ QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern();
private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
- private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
private static final NodeIdentifier STREAM_NAME_NODEID =
NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
- private final StampedLock dataChangeListenersLock = new StampedLock();
- private final StampedLock notificationListenersLock = new StampedLock();
- private final StampedLock deviceNotificationListenersLock = new StampedLock();
- private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
- private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
- private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
+ private final ConcurrentMap<String, AbstractStream<?>> streams = new ConcurrentHashMap<>();
private ListenersBroker() {
// Hidden on purpose
}
/**
- * Gets {@link ListenerAdapter} specified by stream identification.
- *
- * @param streamName Stream name.
- * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
- * does not exist.
- * @throws NullPointerException in {@code streamName} is {@code null}
- */
- public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
- requireNonNull(streamName);
-
- final long stamp = dataChangeListenersLock.readLock();
- try {
- return dataChangeListeners.get(streamName);
- } finally {
- dataChangeListenersLock.unlockRead(stamp);
- }
- }
-
- /**
- * Gets {@link NotificationListenerAdapter} specified by stream name.
- *
- * @param streamName Stream name.
- * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
- * stream name does not exist.
- * @throws NullPointerException in {@code streamName} is {@code null}
- */
- public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
- requireNonNull(streamName);
-
- final long stamp = notificationListenersLock.readLock();
- try {
- return notificationListeners.get(streamName);
- } finally {
- notificationListenersLock.unlockRead(stamp);
- }
- }
-
- /**
- * Get listener for device path.
- *
- * @param streamName name.
- * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
- * specified stream name does not exist.
- * @throws NullPointerException in {@code path} is {@code null}
- */
- public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
- requireNonNull(streamName);
-
- final long stamp = deviceNotificationListenersLock.readLock();
- try {
- return deviceNotificationListeners.get(streamName);
- } finally {
- deviceNotificationListenersLock.unlockRead(stamp);
- }
- }
-
- /**
- * Get listener for stream-name.
+ * Get an {@link AbstractStream} by its name.
*
* @param streamName Stream name.
- * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
- * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
- */
- public final @Nullable AbstractStream<?> listenerFor(final String streamName) {
- if (streamName.startsWith(NOTIFICATION_STREAM)) {
- return notificationListenerFor(streamName);
- } else if (streamName.startsWith(DATA_SUBSCRIPTION)) {
- return dataChangeListenerFor(streamName);
- } else if (streamName.startsWith(DEVICE_NOTIFICATION_STREAM)) {
- return deviceNotificationListenerFor(streamName);
- } else {
- return null;
- }
- }
-
- /**
- * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
- * hasn't been created yet.
- *
- * @param path Path to data in data repository.
- * @param outputType Specific type of output for notifications - XML or JSON.
- * @return Created or existing data-change listener adapter.
+ * @return An {@link AbstractStream}, or {@code null} if the stream with specified name does not exist.
+ * @throws NullPointerException if {@code streamName} is {@code null}
*/
- public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
- final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
- final NotificationOutputType outputType) {
- final var sb = new StringBuilder(DATA_SUBSCRIPTION)
- .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
- .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
- .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
- if (outputType != NotificationOutputType.XML) {
- sb.append('/').append(outputType.getName());
- }
-
- final long stamp = dataChangeListenersLock.writeLock();
- try {
- return dataChangeListeners.computeIfAbsent(sb.toString(),
- streamName -> new ListenerAdapter(streamName, outputType, this, datastore, path));
- } finally {
- dataChangeListenersLock.unlockWrite(stamp);
- }
+ public final @Nullable AbstractStream<?> getStream(final String streamName) {
+ return streams.get(streamName);
}
/**
- * Creates new {@link NotificationDefinition} listener using input stream name and schema path
- * if such listener haven't been created yet.
+ * Create an {@link AbstractStream} with a unique name. This method will atomically generate a stream name, create
+ * the corresponding instance and register it
*
- * @param refSchemaCtx reference {@link EffectiveModelContext}
- * @param notifications {@link QName}s of accepted YANG notifications
- * @param outputType Specific type of output for notifications - XML or JSON.
- * @return Created or existing notification listener adapter.
+ * @param <T> Stream type
+ * @param factory Factory for creating the actual stream instance
+ * @return An {@link AbstractStream} instance
+ * @throws NullPointerException if {@code factory} is {@code null}
*/
- public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
- final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
- final var sb = new StringBuilder(NOTIFICATION_STREAM).append('/');
- var haveFirst = false;
- for (var qname : notifications) {
- final var module = refSchemaCtx.findModuleStatement(qname.getModule())
- .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
- ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
- final var stmt = module.findSchemaTreeNode(qname)
- .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
- ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
- if (!(stmt instanceof NotificationEffectiveStatement)) {
- throw new RestconfDocumentedException(qname + " refers to a non-notification",
- ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
- }
+ public final <T extends AbstractStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
+ String name;
+ T stream;
+ do {
+ // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
+ // it into UUID URN namespace as defined by RFC4122
+ name = "urn:uuid:" + UUID.randomUUID().toString();
+ stream = factory.createStream(name);
+ } while (streams.putIfAbsent(name, stream) != null);
- if (haveFirst) {
- sb.append(',');
- } else {
- haveFirst = true;
- }
- sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
- }
- if (outputType != NotificationOutputType.XML) {
- sb.append('/').append(outputType.getName());
- }
-
- final long stamp = notificationListenersLock.writeLock();
- try {
- return notificationListeners.computeIfAbsent(sb.toString(),
- streamName -> new NotificationListenerAdapter(streamName, outputType, this, notifications));
- } finally {
- notificationListenersLock.unlockWrite(stamp);
- }
+ return stream;
}
/**
- * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
- * if such listener haven't been created yet.
+ * Remove a particular stream and remove its entry from operational datastore.
*
- * @param deviceName Device name.
- * @param outputType Specific type of output for notifications - XML or JSON.
- * @param refSchemaCtx Schema context of node
- * @param mountPointService Mount point service
- * @return Created or existing device notification listener adapter.
- */
- private DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
- final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
- final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
- final var sb = new StringBuilder(DEVICE_NOTIFICATION_STREAM).append('/')
- .append(deviceName);
-
- final long stamp = deviceNotificationListenersLock.writeLock();
- try {
- return deviceNotificationListeners.computeIfAbsent(sb.toString(),
- streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, refSchemaCtx,
- mountPointService, path));
- } finally {
- deviceNotificationListenersLock.unlockWrite(stamp);
- }
- }
-
- /**
- * Removal and closing of all data-change-event and notification listeners.
- */
- public final synchronized void removeAndCloseAllListeners() {
- final long stampNotifications = notificationListenersLock.writeLock();
- final long stampDataChanges = dataChangeListenersLock.writeLock();
- try {
- removeAndCloseAllDataChangeListenersTemplate();
- removeAndCloseAllNotificationListenersTemplate();
- } finally {
- dataChangeListenersLock.unlockWrite(stampDataChanges);
- notificationListenersLock.unlockWrite(stampNotifications);
- }
- }
-
- /**
- * Closes and removes all data-change listeners.
- */
- public final void removeAndCloseAllDataChangeListeners() {
- final long stamp = dataChangeListenersLock.writeLock();
- try {
- removeAndCloseAllDataChangeListenersTemplate();
- } finally {
- dataChangeListenersLock.unlockWrite(stamp);
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void removeAndCloseAllDataChangeListenersTemplate() {
- dataChangeListeners.values().forEach(listenerAdapter -> {
- try {
- listenerAdapter.close();
- } catch (Exception e) {
- LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
- throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
- e);
- }
- });
- dataChangeListeners.clear();
- }
-
- /**
- * Closes and removes all notification listeners.
+ * @param stream Stream to remove
*/
- public final void removeAndCloseAllNotificationListeners() {
- final long stamp = notificationListenersLock.writeLock();
- try {
- removeAndCloseAllNotificationListenersTemplate();
- } finally {
- notificationListenersLock.unlockWrite(stamp);
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void removeAndCloseAllNotificationListenersTemplate() {
- notificationListeners.values().forEach(listenerAdapter -> {
- try {
- listenerAdapter.close();
- } catch (Exception e) {
- LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
- throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
- e);
+ final void removeStream(final DOMDataBroker dataBroker, final AbstractStream<?> stream) {
+ // Defensive check to see if we are still tracking the stream
+ final var streamName = stream.getStreamName();
+ if (streams.get(streamName) != stream) {
+ LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream);
+ return;
+ }
+
+ // Now issue a delete operation while the name is still protected by being associated in the map.
+ final var tx = dataBroker.newWriteOnlyTransaction();
+ tx.delete(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(streamName));
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.debug("Stream {} removed", streamName);
+ streams.remove(streamName, stream);
}
- });
- notificationListeners.clear();
- }
-
- /**
- * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
- *
- * @param listener Listener to be closed and removed.
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
- final long stamp = dataChangeListenersLock.writeLock();
- try {
- removeAndCloseDataChangeListenerTemplate(listener);
- } catch (Exception exception) {
- LOG.error("Data-change listener {} cannot be closed.", listener, exception);
- } finally {
- dataChangeListenersLock.unlockWrite(stamp);
- }
- }
- /**
- * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
- *
- * @param listener Listener to be closed and removed.
- */
- private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
- try {
- requireNonNull(listener).close();
- if (dataChangeListeners.inverse().remove(listener) == null) {
- LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", streamName, cause);
+ streams.remove(streamName, stream);
}
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Data-change listener {} cannot be closed.", listener, e);
- throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
- }
- }
-
- /**
- * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
- *
- * @param listener Listener to be closed and removed.
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
- final long stamp = notificationListenersLock.writeLock();
- try {
- removeAndCloseNotificationListenerTemplate(listener);
- } catch (Exception e) {
- LOG.error("Notification listener {} cannot be closed.", listener, e);
- } finally {
- notificationListenersLock.unlockWrite(stamp);
- }
- }
-
- /**
- * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
- * specified in parameter.
- *
- * @param listener Listener to be closed and removed.
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
- final long stamp = deviceNotificationListenersLock.writeLock();
- try {
- requireNonNull(listener);
- if (deviceNotificationListeners.inverse().remove(listener) == null) {
- LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
- }
- } catch (final Exception exception) {
- LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
- } finally {
- deviceNotificationListenersLock.unlockWrite(stamp);
- }
- }
-
- private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
- try {
- requireNonNull(listener).close();
- if (notificationListeners.inverse().remove(listener) == null) {
- LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Notification listener {} cannot be closed.", listener, e);
- throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
- }
- }
-
- /**
- * Removal and closing of general listener (data-change or notification listener).
- *
- * @param stream Stream to be closed and removed from cache.
- */
- final void removeAndCloseListener(final AbstractStream<?> stream) {
- requireNonNull(stream);
- if (stream instanceof ListenerAdapter dataChange) {
- removeAndCloseDataChangeListener(dataChange);
- } else if (stream instanceof NotificationListenerAdapter notification) {
- removeAndCloseNotificationListener(notification);
- }
+ }, MoreExecutors.directExecutor());
}
/**
* @param uri URI for creation of stream name.
* @return String representation of stream name.
*/
- private static String createStreamNameFromUri(final String uri) {
- String result = requireNonNull(uri);
- while (true) {
- if (result.startsWith(URLConstants.BASE_PATH)) {
- result = result.substring(URLConstants.BASE_PATH.length());
- } else if (result.startsWith("/")) {
- result = result.substring(1);
- } else {
- break;
- }
- }
- if (result.endsWith("/")) {
- result = result.substring(0, result.length() - 1);
- }
- return result;
- }
+// private static String createStreamNameFromUri(final String uri) {
+// String result = requireNonNull(uri);
+// while (true) {
+// if (result.startsWith(URLConstants.BASE_PATH)) {
+// result = result.substring(URLConstants.BASE_PATH.length());
+// } else if (result.startsWith("/")) {
+// result = result.substring(1);
+// } else {
+// break;
+// }
+// }
+// if (result.endsWith("/")) {
+// result = result.substring(0, result.length() - 1);
+// }
+// return result;
+// }
/**
* Prepare URL from base name and stream name.
*/
public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
- /**
- * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
- * about listener to DS according to ietf-restconf-monitoring.
- *
- * @param identifier Name of the stream.
- * @param uriInfo URI information.
- * @param notificationQueryParams Query parameters of notification.
- * @param handlersHolder Holder of handlers for notifications.
- * @return Stream location for listening.
- */
- public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
- final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
- final String streamName = createStreamNameFromUri(identifier);
- if (isNullOrEmpty(streamName)) {
- throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
- }
-
- final var notificationListenerAdapter = notificationListenerFor(streamName);
- if (notificationListenerAdapter == null) {
- throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
- ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
- }
-
- final URI uri = prepareUriByStreamName(uriInfo, streamName);
- notificationListenerAdapter.setQueryParams(notificationQueryParams);
- notificationListenerAdapter.listen(handlersHolder.notificationService());
- final DOMDataBroker dataBroker = handlersHolder.dataBroker();
- notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
- final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
- notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
-
- // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
- final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
- writeDataToDS(writeTransaction, mapToStreams);
- submitData(writeTransaction);
- return uri;
- }
-
- /**
- * Register listener by streamName in identifier to listen to data change notifications, and put or delete
- * information about listener to DS according to ietf-restconf-monitoring.
- *
- * @param identifier Identifier as stream name.
- * @param uriInfo Base URI information.
- * @param notificationQueryParams Query parameters of notification.
- * @param handlersHolder Holder of handlers for notifications.
- * @return Location for listening.
- */
- public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
- final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
- final var streamName = createStreamNameFromUri(identifier);
- final var listener = dataChangeListenerFor(streamName);
- if (listener == null) {
- throw new RestconfDocumentedException("No listener found for stream " + streamName,
- ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
- }
-
- listener.setQueryParams(notificationQueryParams);
-
- final var dataBroker = handlersHolder.dataBroker();
- final var schemaHandler = handlersHolder.databindProvider();
- listener.setCloseVars(dataBroker, schemaHandler);
- listener.listen(dataBroker);
-
- final var uri = prepareUriByStreamName(uriInfo, streamName);
- final var schemaContext = schemaHandler.currentContext().modelContext();
- final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
-
- final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
- listener.getOutputType(), uri, schemaContext, serializedPath);
- final var writeTransaction = dataBroker.newWriteOnlyTransaction();
- writeDataToDS(writeTransaction, mapToStreams);
- submitData(writeTransaction);
- return uri;
- }
-
// FIXME: callers are utter duplicates, refactor them
- private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
- // FIXME: use put() here
- tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
- mapToStreams);
- }
-
- private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
- try {
- readWriteTransaction.commit().get();
- } catch (final InterruptedException | ExecutionException e) {
- throw new RestconfDocumentedException("Problem while putting data to DS.", e);
- }
- }
-
+// private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
+// // FIXME: use put() here
+// tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
+// mapToStreams);
+// }
+//
+// private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
+// try {
+// readWriteTransaction.commit().get();
+// } catch (final InterruptedException | ExecutionException e) {
+// throw new RestconfDocumentedException("Problem while putting data to DS.", e);
+// }
+// }
/**
* Create data-change-event stream with POST operation via RPC.
public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ContainerNode input,
final EffectiveModelContext modelContext) {
final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
- final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
- final var adapter = registerDataChangeListener(modelContext,
- datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
- preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
+ final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
+ : LogicalDatastoreType.CONFIGURATION;
+ final var path = preparePath(input);
+ final var outputType = prepareOutputType(input);
+ final var adapter = createStream(name -> new ListenerAdapter(name, outputType, this, datastore, path));
// building of output
return RestconfFuture.of(Optional.of(Builders.containerBuilder()
.build()));
}
+// /**
+// * Register listener by streamName in identifier to listen to data change notifications, and put or delete
+// * information about listener to DS according to ietf-restconf-monitoring.
+// *
+// * @param identifier Identifier as stream name.
+// * @param uriInfo Base URI information.
+// * @param notificationQueryParams Query parameters of notification.
+// * @param handlersHolder Holder of handlers for notifications.
+// * @return Location for listening.
+// */
+// public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
+// final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
+// final var streamName = createStreamNameFromUri(identifier);
+// final var listener = dataChangeListenerFor(streamName);
+// if (listener == null) {
+// throw new RestconfDocumentedException("No listener found for stream " + streamName,
+// ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+// }
+//
+// listener.setQueryParams(notificationQueryParams);
+//
+// final var dataBroker = handlersHolder.dataBroker();
+// final var schemaHandler = handlersHolder.databindProvider();
+// listener.setCloseVars(dataBroker, schemaHandler);
+// listener.listen(dataBroker);
+//
+// final var uri = prepareUriByStreamName(uriInfo, streamName);
+// final var schemaContext = schemaHandler.currentContext().modelContext();
+// final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
+//
+// final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
+// listener.getOutputType(), uri, schemaContext, serializedPath);
+// final var writeTransaction = dataBroker.newWriteOnlyTransaction();
+// writeDataToDS(writeTransaction, mapToStreams);
+// submitData(writeTransaction);
+// return uri;
+// }
+
// FIXME: this really should be a normal RPC implementation
public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ContainerNode input,
final EffectiveModelContext modelContext) {
}
}
+// FIXME: use this block to create a stream description
+// final var module = refSchemaCtx.findModuleStatement(qname.getModule())
+// .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
+// ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+// final var stmt = module.findSchemaTreeNode(qname)
+// .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
+// ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+// if (!(stmt instanceof NotificationEffectiveStatement)) {
+// throw new RestconfDocumentedException(qname + " refers to a non-notification",
+// ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
+// }
+//
+// if (haveFirst) {
+// sb.append(',');
+// } else {
+// haveFirst = true;
+// }
+// sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
+
// registration of the listener
- final var adapter = registerNotificationListener(modelContext, qnames, prepareOutputType(input));
+ final var outputType = prepareOutputType(input);
+ final var adapter = createStream(name -> new NotificationListenerAdapter(name, outputType, this, qnames));
return RestconfFuture.of(Optional.of(Builders.containerBuilder()
.withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
.build()));
}
+ /**
+ * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
+ * about listener to DS according to ietf-restconf-monitoring.
+ *
+ * @param identifier Name of the stream.
+ * @param uriInfo URI information.
+ * @param notificationQueryParams Query parameters of notification.
+ * @param handlersHolder Holder of handlers for notifications.
+ * @return Stream location for listening.
+ */
+// public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
+// final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
+// final String streamName = createStreamNameFromUri(identifier);
+// if (isNullOrEmpty(streamName)) {
+// throw new RestconfDocumentedException("Stream name is empty", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+// }
+//
+// final var notificationListenerAdapter = notificationListenerFor(streamName);
+// if (notificationListenerAdapter == null) {
+// throw new RestconfDocumentedException("Stream with name %s was not found".formatted(streamName),
+// ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
+// }
+//
+// final URI uri = prepareUriByStreamName(uriInfo, streamName);
+// notificationListenerAdapter.setQueryParams(notificationQueryParams);
+// notificationListenerAdapter.listen(handlersHolder.notificationService());
+// final DOMDataBroker dataBroker = handlersHolder.dataBroker();
+// notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
+// final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
+// notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
+//
+// // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+// final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+// writeDataToDS(writeTransaction, mapToStreams);
+// submitData(writeTransaction);
+// return uri;
+// }
+
/**
* Create device notification stream.
*
* @return {@link DOMRpcResult} - Output of RPC - example in JSON
*/
// FIXME: this should be an RPC invocation
- public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationListener(final ContainerNode input,
+ public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final ContainerNode input,
final String baseUrl, final DOMMountPointService mountPointService) {
// parsing out of container with settings and path
// FIXME: ugly cast
throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
ErrorTag.INVALID_VALUE);
}
- final String deviceName = listId.values().iterator().next().toString();
final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
.orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
ErrorTag.OPERATION_FAILED);
}
- final var notificationListenerAdapter = registerDeviceNotificationListener(deviceName,
- prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
+// FIXME: use this for description?
+// final String deviceName = listId.values().iterator().next().toString();
+
+ final var outputType = prepareOutputType(input);
+ final var notificationListenerAdapter = createStream(
+ streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, mountModelContext,
+ mountPointService, mountPoint.getIdentifier()));
notificationListenerAdapter.listen(mountNotifService, notificationPaths);
return RestconfFuture.of(Optional.of(Builders.containerBuilder()
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-/**
- * Constants for streams.
- */
-public final class RestconfStreamsConstants {
- public static final String DATASTORE_PARAM_NAME = "datastore";
- public static final String SCOPE_PARAM_NAME = "scope";
-
- private RestconfStreamsConstants() {
- // Hidden on purpose
- }
-}
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Strings;
import java.util.concurrent.ScheduledExecutorService;
/**
* Handling of SSE session close event. Removal of subscription at listener and stopping of the ping process.
*/
- public synchronized void close() {
+ @VisibleForTesting
+ synchronized void close() {
listener.removeSubscriber(this);
stopPingProcess();
}
+ @Override
+ public synchronized boolean isConnected() {
+ return !sink.isClosed();
+ }
+
/**
* Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
* to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
}
}
+ @Override
+ public synchronized void endOfStream() {
+ stopPingProcess();
+ sink.close();
+ }
+
/**
* Split message to fragments. SSE automatically fragment string with new line character.
* For manual fragmentation we will remove all new line characters
}
}
- @Override
- public synchronized boolean isConnected() {
- return !sink.isClosed();
- }
-
// TODO:return some type of identification of connection
@Override
public String toString() {
* @param data Message data to be send.
*/
void sendDataMessage(String data);
+
+ /**
+ * Called when the stream has reached its end. The handler should close all underlying resources.
+ */
+ void endOfStream();
}
final var path = req.getRequestURI().getPath();
if (path.startsWith(STREAMS_PREFIX)) {
final var streamName = path.substring(STREAMS_PREFIX.length());
- final var listener = listenersBroker.listenerFor(streamName);
+ final var listener = listenersBroker.getStream(streamName);
if (listener != null) {
LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
streamName);
}
}
+ @Override
+ public synchronized void endOfStream() {
+ if (session != null && session.isOpen()) {
+ session.close();
+ }
+ stopPingProcess();
+ }
+
/**
* Sensing of string message to remote endpoint of {@link org.eclipse.jetty.websocket.api.Session}. If the maximum
* fragment length is set to non-zero positive value and input message exceeds this value, message is fragmented
}
if (session != null && session.isOpen()) {
- final RemoteEndpoint remoteEndpoint = session.getRemote();
+ final var remoteEndpoint = session.getRemote();
if (maximumFragmentLength == 0 || message.length() <= maximumFragmentLength) {
sendDataMessage(message, remoteEndpoint);
} else {
}
private static List<String> splitMessageToFragments(final String inputMessage, final int maximumFragmentLength) {
- final List<String> parts = new ArrayList<>();
+ final var parts = new ArrayList<String>();
int length = inputMessage.length();
for (int i = 0; i < length; i += maximumFragmentLength) {
parts.add(inputMessage.substring(i, Math.min(length, i + maximumFragmentLength)));
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.StringWriter;
import java.time.Instant;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
final class XMLNotificationFormatter extends NotificationFormatter {
- private static final XMLNotificationFormatter EMPTY = new XMLNotificationFormatter(TextParameters.EMPTY);
-
+ @VisibleForTesting
+ static final XMLNotificationFormatter EMPTY = new XMLNotificationFormatter(TextParameters.EMPTY);
static final NotificationFormatterFactory FACTORY = new NotificationFormatterFactory(EMPTY) {
@Override
XMLNotificationFormatter newFormatter(final TextParameters textParams) {
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
-import com.google.common.collect.ImmutableSet;
import java.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class JsonNotificationListenerTest extends AbstractNotificationListenerTest {
- private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class);
-
- private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
+@ExtendWith(MockitoExtension.class)
+class JSONNotificationFormatterTest extends AbstractNotificationListenerTest {
+ @Mock
+ private DOMNotification notificationData;
@Test
- public void notifi_leafTest() throws Exception {
+ void notifi_leafTest() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-leaf");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+ final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+ final var notifiBody = mockCont(schemaPathNotifi, leaf);
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- final String result = prepareJson(notificationData, schemaPathNotifi);
-
- LOG.info("json result: {}", result);
+ final String result = prepareJson(schemaPathNotifi);
assertTrue(result.contains("ietf-restconf:notification"));
assertTrue(result.contains("event-time"));
}
@Test
- public void notifi_cont_leafTest() throws Exception {
+ void notifi_cont_leafTest() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-cont");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
- final ContainerNode cont = mockCont(QName.create(MODULE, "cont"), leaf);
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, cont);
+ final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+ final var cont = mockCont(QName.create(MODULE, "cont"), leaf);
+ final var notifiBody = mockCont(schemaPathNotifi, cont);
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- final String result = prepareJson(notificationData, schemaPathNotifi);
+ final String result = prepareJson(schemaPathNotifi);
assertTrue(result.contains("ietf-restconf:notification"));
assertTrue(result.contains("event-time"));
}
@Test
- public void notifi_list_Test() throws Exception {
+ void notifi_list_Test() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-list");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
- final MapEntryNode entry = mockMapEntry(QName.create(MODULE, "lst"), leaf);
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, Builders.mapBuilder()
+ final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+ final var entry = mockMapEntry(QName.create(MODULE, "lst"), leaf);
+ final var notifiBody = mockCont(schemaPathNotifi, Builders.mapBuilder()
.withNodeIdentifier(NodeIdentifier.create(QName.create(MODULE, "lst")))
.withChild(entry)
.build());
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- final String result = prepareJson(notificationData, schemaPathNotifi);
+ final String result = prepareJson(schemaPathNotifi);
assertTrue(result.contains("ietf-restconf:notification"));
assertTrue(result.contains("event-time"));
}
@Test
- public void notifi_grpTest() throws Exception {
+ void notifi_grpTest() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-grp");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+ final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+ final var notifiBody = mockCont(schemaPathNotifi, leaf);
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- final String result = prepareJson(notificationData, schemaPathNotifi);
+ final String result = prepareJson(schemaPathNotifi);
assertTrue(result.contains("ietf-restconf:notification"));
assertTrue(result.contains("event-time"));
}
@Test
- public void notifi_augmTest() throws Exception {
+ void notifi_augmTest() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-augm");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf-augm"));
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+ final var leaf = mockLeaf(QName.create(MODULE, "lf-augm"));
+ final var notifiBody = mockCont(schemaPathNotifi, leaf);
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- final String result = prepareJson(notificationData, schemaPathNotifi);
+ final String result = prepareJson(schemaPathNotifi);
assertTrue(result.contains("ietf-restconf:notification"));
assertTrue(result.contains("event-time"));
return ImmutableNodes.leafNode(leafQName, "value");
}
- private String prepareJson(final DOMNotification notificationData, final QName schemaPathNotifi)
- throws Exception {
- final var ret = listenersBroker.registerNotificationListener(MODEL_CONTEXT, ImmutableSet.of(schemaPathNotifi),
- NotificationOutputType.JSON).formatter().eventData(MODEL_CONTEXT, notificationData, Instant.now());
+ private String prepareJson(final QName schemaPathNotifi) throws Exception {
+ final var ret = JSONNotificationFormatter.EMPTY.eventData(MODEL_CONTEXT, notificationData, Instant.now());
assertNotNull(ret);
return ret;
}
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
-import java.util.function.Function;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.ContainerLike;
-import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class ListenersBrokerTest {
- private static EffectiveModelContext SCHEMA_CTX;
+@ExtendWith(MockitoExtension.class)
+class ListenersBrokerTest {
+ private static final EffectiveModelContext SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams");
private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
- @BeforeClass
- public static void setUp() {
- SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams");
- }
-
@Test
- public void createStreamTest() {
- assertEquals(prepareDomPayload("create-data-change-event-subscription",
- RpcDefinition::getOutput,
- "data-change-event-subscription/toaster:toaster/datastore=CONFIGURATION/scope=BASE", "stream-name"),
- listenersBroker.createDataChangeNotifiStream(
- prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput, "toaster", "path"),
- SCHEMA_CTX).getOrThrow().orElseThrow());
+ void createStreamTest() {
+ final var output = assertInstanceOf(ContainerNode.class, listenersBroker.createDataChangeNotifiStream(
+ prepareDomPayload("create-data-change-event-subscription", "toaster", "path"),
+ SCHEMA_CTX).getOrThrow().orElse(null));
+
+ assertEquals(new NodeIdentifier(CreateDataChangeEventSubscriptionOutput.QNAME), output.name());
+ assertEquals(1, output.size());
+
+ final var streamName = assertInstanceOf(LeafNode.class,
+ output.childByArg(new NodeIdentifier(
+ QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name"))));
+ final var name = assertInstanceOf(String.class, streamName.body());
+ assertEquals(45, name.length());
+ assertThat(name, startsWith("urn:uuid:"));
+ assertNotNull(UUID.fromString(name.substring(9)));
}
@Test
- public void createStreamWrongValueTest() {
- final var payload = prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput,
- "String value", "path");
+ void createStreamWrongValueTest() {
+ final var payload = prepareDomPayload("create-data-change-event-subscription", "String value", "path");
final var errors = assertThrows(RestconfDocumentedException.class,
() -> listenersBroker.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
assertEquals(1, errors.size());
}
@Test
- public void createStreamWrongInputRpcTest() {
- final var payload = prepareDomPayload("create-data-change-event-subscription2", RpcDefinition::getInput,
- "toaster", "path2");
+ void createStreamWrongInputRpcTest() {
+ final var payload = prepareDomPayload("create-data-change-event-subscription2", "toaster", "path2");
final var errors = assertThrows(RestconfDocumentedException.class,
() -> listenersBroker.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
assertEquals(1, errors.size());
assertEquals("Instance identifier was not normalized correctly", error.getErrorMessage());
}
- private static ContainerNode prepareDomPayload(final String rpcName,
- final Function<RpcDefinition, ContainerLike> rpcToContainer, final String toasterValue,
+ private static ContainerNode prepareDomPayload(final String rpcName, final String toasterValue,
final String inputOutputName) {
- final Module rpcModule = SCHEMA_CTX.findModules("sal-remote").iterator().next();
+ final var rpcModule = SCHEMA_CTX.findModules("sal-remote").iterator().next();
final QName rpcQName = QName.create(rpcModule.getQNameModule(), rpcName);
ContainerLike containerSchemaNode = null;
for (final RpcDefinition rpc : rpcModule.getRpcs()) {
if (rpcQName.isEqualWithoutRevision(rpc.getQName())) {
- containerSchemaNode = rpcToContainer.apply(rpc);
+ containerSchemaNode = rpc.getInput();
break;
}
}
assertNotNull(containerSchemaNode);
final QName lfQName = QName.create(rpcModule.getQNameModule(), inputOutputName);
- final DataSchemaNode lfSchemaNode = containerSchemaNode.getDataChildByName(lfQName);
- assertThat(lfSchemaNode, instanceOf(LeafSchemaNode.class));
+ assertInstanceOf(LeafSchemaNode.class, containerSchemaNode.dataChildByName(lfQName));
final Object o;
if ("toaster".equals(toasterValue)) {
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@ExtendWith(MockitoExtension.class)
class WebSocketFactoryTest extends AbstractNotificationListenerTest {
- private static final String REGISTERED_STREAM_NAME = "data-change-event-subscription/"
- + "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE/JSON";
-
private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
@Mock
private ServletUpgradeResponse upgradeResponse;
private WebSocketFactory webSocketFactory;
+ private String streamName;
@BeforeEach
void prepareListenersBroker() {
webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
- listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
- Scope.SUBTREE, NotificationOutputTypeGrouping.NotificationOutputType.JSON);
+ streamName = listenersBroker.createStream(name -> new ListenerAdapter(name, NotificationOutputType.JSON,
+ listenersBroker, LogicalDatastoreType.CONFIGURATION,
+ YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
+ .getStreamName();
}
@Test
void createWebSocketSuccessfully() {
- doReturn(URI.create("https://localhost:8181/rests/streams/" + REGISTERED_STREAM_NAME))
+ doReturn(URI.create("https://localhost:8181/rests/streams/" + streamName))
.when(upgradeRequest).getRequestURI();
assertInstanceOf(WebSocketSessionHandler.class,
@Test
void createWebSocketUnsuccessfully() {
- doReturn(URI.create("https://localhost:8181/rests/streams/" + REGISTERED_STREAM_NAME + "/toasterStatus"))
+ doReturn(URI.create("https://localhost:8181/rests/streams/" + streamName + "/toasterStatus"))
.when(upgradeRequest).getRequestURI();
assertNull(webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse));
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.when;
-import com.google.common.collect.ImmutableSet;
import java.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.xmlunit.assertj.XmlAssert;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class XmlNotificationListenerTest extends AbstractNotificationListenerTest {
- private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
+@ExtendWith(MockitoExtension.class)
+class XMLNotificationFormatterTest extends AbstractNotificationListenerTest {
+ @Mock
+ private DOMNotification notificationData;
@Test
- public void notifi_leafTest() throws Exception {
+ void notifi_leafTest() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-leaf");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+ final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+ final var notifiBody = mockCont(schemaPathNotifi, leaf);
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+ assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
<notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
<eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-leaf xmlns="notifi:mod">\
<lf>value</lf></notifi-leaf></notification>""");
}
@Test
- public void notifi_cont_leafTest() throws Exception {
+ void notifi_cont_leafTest() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-cont");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
- final ContainerNode cont = mockCont(QName.create(MODULE, "cont"), leaf);
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, cont);
+ final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+ final var cont = mockCont(QName.create(MODULE, "cont"), leaf);
+ final var notifiBody = mockCont(schemaPathNotifi, cont);
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+ assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
<notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
<eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-cont xmlns="notifi:mod">\
<cont><lf>value</lf></cont></notifi-cont></notification>""");
}
@Test
- public void notifi_list_Test() throws Exception {
+ void notifi_list_Test() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-list");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
- final MapEntryNode entry = mockMapEntry(QName.create(MODULE, "lst"), leaf);
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, Builders.mapBuilder()
+ final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+ final var entry = mockMapEntry(QName.create(MODULE, "lst"), leaf);
+ final var notifiBody = mockCont(schemaPathNotifi, Builders.mapBuilder()
.withNodeIdentifier(NodeIdentifier.create(QName.create(MODULE, "lst")))
.withChild(entry)
.build());
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+ assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
<notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
<eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-list xmlns="notifi:mod">\
<lst><lf>value</lf></lst></notifi-list></notification>""");
}
@Test
- public void notifi_grpTest() throws Exception {
+ void notifi_grpTest() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-grp");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+ final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+ final var notifiBody = mockCont(schemaPathNotifi, leaf);
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+ assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
<notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
<eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-grp xmlns="notifi:mod">\
<lf>value</lf></notifi-grp></notification>""");
}
@Test
- public void notifi_augmTest() throws Exception {
+ void notifi_augmTest() throws Exception {
final QName schemaPathNotifi = QName.create(MODULE, "notifi-augm");
- final DOMNotification notificationData = mock(DOMNotification.class);
-
- final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf-augm"));
- final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+ final var leaf = mockLeaf(QName.create(MODULE, "lf-augm"));
+ final var notifiBody = mockCont(schemaPathNotifi, leaf);
when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
when(notificationData.getBody()).thenReturn(notifiBody);
- assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+ assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
<notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
<eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-augm xmlns="notifi:mod">\
<lf-augm>value</lf-augm></notifi-augm></notification>""");
return ImmutableNodes.leafNode(leafQName, "value");
}
- private String prepareXmlResult(final DOMNotification notificationData, final QName schemaPathNotifi)
- throws Exception {
- final var ret = listenersBroker.registerNotificationListener(MODEL_CONTEXT, ImmutableSet.of(schemaPathNotifi),
- NotificationOutputType.XML).formatter().eventData(MODEL_CONTEXT, notificationData, Instant.now());
+ private String prepareXmlResult(final QName schemaPathNotifi) throws Exception {
+ final var ret = XMLNotificationFormatter.EMPTY.eventData(MODEL_CONTEXT, notificationData, Instant.now());
assertNotNull(ret);
return ret;
}