operationProvider = new SimulatedOperationProvider(idProvider, transformedCapabilities,
Optional.ofNullable(configuration.getNotificationFile()),
Optional.ofNullable(configuration.getInitialConfigXMLFile()));
+ } else if (configuration.isNotificationsSupported()) {
+ LOG.info("using SimulatedOperationProvider.");
+ operationProvider = new SimulatedOperationProvider(idProvider, transformedCapabilities,
+ Optional.ofNullable(configuration.getNotificationFile()),
+ Optional.empty());
} else {
LOG.info("using OperationsProvider.");
operationProvider = new OperationsProvider(idProvider, transformedCapabilities,
public static final Set<String> DEFAULT_BASE_CAPABILITIES_EXI = ImmutableSet.of(
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
- XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_NOTIFICATION_1_0
);
public static final Set<String> DEFAULT_BASE_CAPABILITIES = ImmutableSet.of(
@Deprecated
public boolean isXmlConfigurationProvided() {
- return initialConfigXMLFile != null && notificationFile != null;
+ return initialConfigXMLFile != null;
+ }
+
+ @Deprecated
+ public boolean isNotificationsSupported() {
+ return notificationFile != null;
}
@Deprecated
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.mdsal.binding.model.ietf</groupId>
+ <artifactId>rfc6991-ietf-inet-types</artifactId>
+ </dependency>
+ </dependencies>
</project>
--- /dev/null
+module odl-device-notification {
+ namespace "urn:opendaylight:device:notification";
+ prefix "device-notifi";
+
+ import ietf-inet-types {
+ prefix inet;
+ }
+
+ organization "OpenDaylight";
+
+ description
+ "This module contains the definition of methods related to
+ device notification model
+
+ Copyright (c)2022 Nikhil Soni and others. All rights reserved.
+
+ This program and the accompanying materials are made available
+ under the terms of the Eclipse Public License v1.0 which
+ accompanies this distribution, and is available at
+ http://www.eclipse.org/legal/epl-v10.html";
+
+ revision "2022-11-06" {
+ description
+ "Initial revision";
+ }
+
+ rpc subscribe-device-notification {
+ description
+ "Subscribe to notifications on specified device.";
+
+ input {
+ leaf path {
+ type instance-identifier;
+ description "Device mount point path";
+ }
+ }
+ output {
+ leaf stream-path {
+ type inet:uri;
+ description "Device Notification stream URL";
+ }
+ }
+ }
+}
streamSubscription,
new RestconfDataServiceImpl(databindProvider, dataBroker, mountPointService, streamSubscription,
actionService, configuration),
- new RestconfInvokeOperationsServiceImpl(rpcService),
+ new RestconfInvokeOperationsServiceImpl(rpcService, mountPointService, configuration),
new RestconfOperationsServiceImpl(databindProvider, mountPointService),
new RestconfSchemaServiceImpl(domSchemaService, mountPointService),
new RestconfImpl(databindProvider)));
import static java.util.Objects.requireNonNull;
import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
final class CreateStreamUtil {
private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
+
+ private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
private static final QName DATASTORE_QNAME =
QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
private static final QName SCOPE_QNAME =
QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
private static final QName OUTPUT_TYPE_QNAME =
QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
+ private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
+ QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
+ private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
+ QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
-
+ private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
+ NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
private static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
ImmutableSet.of(SCOPE_QNAME, DATASTORE_QNAME, OUTPUT_TYPE_QNAME));
.build());
}
+ /**
+ * Create device notification stream.
+ *
+ * @param baseUrl base Url
+ * @param payload data
+ * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
+ * @param streamUtil stream utility
+ * @param mountPointService dom mount point service
+ * @return {@link DOMRpcResult} - Output of RPC - example in JSON
+ */
+ static DOMRpcResult createDeviceNotificationListener(final String baseUrl, final NormalizedNodePayload payload,
+ final EffectiveModelContext refSchemaCtx, final SubscribeToStreamUtil streamUtil,
+ final DOMMountPointService mountPointService) {
+ // parsing out of container with settings and path
+ final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
+ final YangInstanceIdentifier value =
+ (YangInstanceIdentifier) data.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
+ .map(DataContainerChild::body)
+ .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED));
+ final DOMMountPoint mountPoint = mountPointService.getMountPoint(value)
+ .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED));
+
+ final String deviceName = extractDeviceName(value);
+ final NotificationOutputType outputType = prepareOutputType(data);
+ EffectiveModelContext effectiveModelContext = mountPoint.getService(DOMSchemaService.class).get()
+ .getGlobalContext();
+ Collection<? extends NotificationDefinition> notificationDefinitions = mountPoint.getService(
+ DOMSchemaService.class).get().getGlobalContext()
+ .getNotifications();
+ if (notificationDefinitions == null || notificationDefinitions.isEmpty()) {
+ throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED);
+ }
+
+ Set<Absolute> absolutes = notificationDefinitions.stream()
+ .map(notificationDefinition -> Absolute.of(notificationDefinition.getQName()))
+ .collect(Collectors.toUnmodifiableSet());
+
+ final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
+ .registerDeviceNotificationListener(deviceName, outputType, effectiveModelContext, mountPointService,
+ mountPoint.getIdentifier());
+ notificationListenerAdapter.listen(mountPoint.getService(DOMNotificationService.class).get(), absolutes);
+
+ // building of output
+ return new DefaultDOMRpcResult(Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
+ .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName
+ + "?" + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
+ .build());
+ }
+
+ private static String extractDeviceName(final YangInstanceIdentifier iid) {
+ return ((YangInstanceIdentifier.NodeIdentifierWithPredicates.Singleton)iid.getLastPathArgument())
+ .values().getElement().toString();
+ }
+
/**
* Prepare {@link NotificationOutputType}.
*
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataStreamService;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
@Override
public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) {
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- final BaseListenerInterface listener = listenersBroker.getListenerFor(streamName)
- .orElseThrow(() -> {
- LOG.debug("Listener for stream with name {} was not found.", streamName);
- throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
- });
+ final BaseListenerInterface listener;
+ final String notificaionType =
+ uriInfo.getQueryParameters().getFirst(RestconfStreamsConstants.NOTIFICATION_TYPE);
+ if (notificaionType != null && notificaionType.equals(RestconfStreamsConstants.DEVICE)) {
+ listener = listenersBroker.getDeviceNotificationListenerFor(streamName)
+ .orElseThrow(() -> {
+ LOG.debug("Listener for device path with name {} was not found.", streamName);
+ throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION,
+ ErrorTag.DATA_MISSING);
+ });
+ } else {
+ listener = listenersBroker.getListenerFor(streamName)
+ .orElseThrow(() -> {
+ LOG.debug("Listener for stream with name {} was not found.", streamName);
+ throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+ });
+ }
LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
-
// FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
// handler.init()/handler.close()
final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener,
import javax.ws.rs.core.UriInfo;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.mdsal.dom.api.DOMRpcException;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfInvokeOperationsService;
+import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
// FIXME: at some point we do not want to have this here, as this is only used for dispatch
private static final QNameModule SAL_REMOTE_NAMESPACE = CreateDataChangeEventSubscriptionInput.QNAME.getModule();
+ private static final QNameModule DEVICE_NOTIFICATION_NAMESPACE = SubscribeDeviceNotificationInput.QNAME.getModule();
private final DOMRpcService rpcService;
+ private final DOMMountPointService mountPointService;
+ private final SubscribeToStreamUtil streamUtils;
- public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService) {
+ public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService,
+ final DOMMountPointService mountPointService, final Configuration configuration) {
this.rpcService = requireNonNull(rpcService);
+ this.mountPointService = requireNonNull(mountPointService);
+ streamUtils = configuration.isUseSSE() ? SubscribeToStreamUtil.serverSentEvents()
+ : SubscribeToStreamUtil.webSockets();
}
@Override
future = Futures.immediateFailedFuture(new RestconfDocumentedException("Unsupported operation",
ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED));
}
+ } else if (DEVICE_NOTIFICATION_NAMESPACE.equals(rpcName.getModule())) {
+ // FIXME: this should be a match on RPC QName
+ final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
+ future = Futures.immediateFuture(
+ CreateStreamUtil.createDeviceNotificationListener(baseUrl, payload, schemaContext, streamUtils,
+ mountPointService));
} else {
future = invokeRpc((ContainerNode)payload.getData(), rpcName, rpcService);
}
public static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
public static final String STREAM_ACCESS_PATH_PART = "/access=";
public static final String STREAM_LOCATION_PATH_PART = "/location";
+ public static final String NOTIFICATION_TYPE = "notificationType";
+ public static final String DEVICE = "device";
private RestconfStreamsConstants() {
// Hidden on purpose
import static java.util.Objects.requireNonNull;
import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
/**
* Features of subscribing part of both notifications.
*/
-abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData implements BaseListenerInterface {
+abstract class AbstractCommonSubscriber<T> extends AbstractNotificationsData implements BaseListenerInterface {
private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder()
.appendValue(ChronoField.YEAR, 4).appendLiteral('-')
private final EventFormatterFactory<T> formatterFactory;
private final NotificationOutputType outputType;
private final String streamName;
- private final P path;
@GuardedBy("this")
private final Set<StreamSessionHandler> subscribers = new HashSet<>();
private boolean changedLeafNodesOnly = false;
private EventFormatter<T> formatter;
- AbstractCommonSubscriber(final QName lastQName, final String streamName, final P path,
- final NotificationOutputType outputType, final EventFormatterFactory<T> formatterFactory) {
+ AbstractCommonSubscriber(final QName lastQName, final String streamName, final NotificationOutputType outputType,
+ final EventFormatterFactory<T> formatterFactory) {
super(lastQName);
this.streamName = requireNonNull(streamName);
checkArgument(!streamName.isEmpty());
- this.path = requireNonNull(path);
this.outputType = requireNonNull(outputType);
this.formatterFactory = requireNonNull(formatterFactory);
}
}
- final P path() {
- return path;
- }
-
/**
* Check whether this query should only notify about leaf node changes.
*
@Override
public final String toString() {
- return MoreObjects.toStringHelper(this)
- .add("path", path)
- .add("stream-name", streamName)
- .add("output-type", getOutputType())
- .toString();
+ return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+ }
+
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return helper.add("stream-name", streamName).add("output-type", getOutputType());
}
/**
--- /dev/null
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+
+import java.time.Instant;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.api.DOMEvent;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for functionality shared between {@link NotificationListenerAdapter} and
+ * {@link DeviceNotificationListenerAdaptor}.
+ */
+abstract class AbstractNotificationListenerAdaptor extends AbstractCommonSubscriber<DOMNotification>
+ implements DOMNotificationListener {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class);
+ private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY =
+ JSONNotificationFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
+
+ AbstractNotificationListenerAdaptor(final QName lastQName, final String streamName,
+ final NotificationOutputType outputType) {
+ super(lastQName, streamName, outputType, getFormatterFactory(outputType));
+ }
+
+ private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
+ return switch (outputType) {
+ case JSON -> JSON_FORMATTER_FACTORY;
+ case XML -> XMLNotificationFormatter.FACTORY;
+ };
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public final void onNotification(final DOMNotification notification) {
+ final var eventInstant = notification instanceof DOMEvent domEvent ? domEvent.getEventInstant() : Instant.now();
+ if (!checkStartStop(eventInstant)) {
+ return;
+ }
+
+ final Optional<String> maybeOutput;
+ try {
+ maybeOutput = formatter().eventData(effectiveModel(), notification, eventInstant, getLeafNodesOnly(),
+ isSkipNotificationData(), getChangedLeafNodesOnly());
+ } catch (Exception e) {
+ LOG.error("Failed to process notification {}", notification, e);
+ return;
+ }
+ maybeOutput.ifPresent(this::post);
+ }
+
+ abstract @NonNull EffectiveModelContext effectiveModel();
+}
--- /dev/null
+/*
+ * Copyright (c) 2022 Opendaylight, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Set;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
+import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DeviceNotificationListenerAdaptor} is responsible to track events on notifications.
+ */
+public final class DeviceNotificationListenerAdaptor extends AbstractNotificationListenerAdaptor implements
+ DOMMountPointListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationListenerAdaptor.class);
+ private final @NonNull EffectiveModelContext effectiveModel;
+
+ private final @NonNull DOMMountPointService mountPointService;
+
+ private ListenerRegistration<DOMMountPointListener> reg;
+ private final YangInstanceIdentifier instanceIdentifier;
+
+
+ public DeviceNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType,
+ final EffectiveModelContext effectiveModel, final DOMMountPointService mountPointService,
+ final YangInstanceIdentifier path) {
+ // FIXME: a dummy QName due to contracts
+ super(QName.create("dummy", "dummy"), streamName, outputType);
+ this.effectiveModel = requireNonNull(effectiveModel);
+ this.mountPointService = requireNonNull(mountPointService);
+ instanceIdentifier = requireNonNull(path);
+ }
+
+ public synchronized void listen(final DOMNotificationService notificationService, final Set<Absolute> paths) {
+ if (!isListening()) {
+ setRegistration(notificationService.registerNotificationListener(this, paths));
+ reg = mountPointService.registerProvisionListener(this);
+ }
+ }
+
+ private synchronized void resetListenerRegistration() {
+ if (reg != null) {
+ reg.close();
+ reg = null;
+ }
+ }
+
+ @Override
+ EffectiveModelContext effectiveModel() {
+ return effectiveModel;
+ }
+
+ @Override
+ public void onMountPointCreated(final YangInstanceIdentifier path) {
+ // No-op
+ }
+
+ @Override
+ public void onMountPointRemoved(final YangInstanceIdentifier path) {
+ if (instanceIdentifier.equals(path)) {
+ getSubscribers().forEach(subscriber -> {
+ if (subscriber.isConnected()) {
+ subscriber.sendDataMessage("Device disconnected");
+ }
+ if (subscriber instanceof SSESessionHandler sseSessionHandler) {
+ try {
+ sseSessionHandler.close();
+ } catch (IllegalStateException e) {
+ LOG.warn("Ignoring exception while closing sse session");
+ }
+ }
+ });
+ ListenersBroker.getInstance().removeAndCloseDeviceNotificationListener(this);
+ resetListenerRegistration();
+ }
+ }
+
+}
*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects.ToStringHelper;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
/**
* {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
*/
-public class ListenerAdapter extends AbstractCommonSubscriber<YangInstanceIdentifier, Collection<DataTreeCandidate>>
+public class ListenerAdapter extends AbstractCommonSubscriber<Collection<DataTreeCandidate>>
implements ClusteredDOMDataTreeChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
private static final DataTreeCandidateFormatterFactory JSON_FORMATTER_FACTORY =
JSONDataTreeCandidateFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
+ private final YangInstanceIdentifier path;
+
/**
* Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
*
@VisibleForTesting
public ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
final NotificationOutputType outputType) {
- super(path.getLastPathArgument().getNodeType(), streamName, path, outputType, getFormatterFactory(outputType));
+ super(path.getLastPathArgument().getNodeType(), streamName, outputType, getFormatterFactory(outputType));
+ this.path = requireNonNull(path);
}
private static DataTreeCandidateFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
* @return Path pointed to data in data store.
*/
public YangInstanceIdentifier getPath() {
- return path();
+ return path;
}
/**
new DOMDataTreeIdentifier(datastore, getPath()), this));
}
}
+
+ @Override
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return super.addToStringAttributes(helper.add("path", path));
+ }
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Function;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.slf4j.Logger;
private final StampedLock dataChangeListenersLock = new StampedLock();
private final StampedLock notificationListenersLock = new StampedLock();
+ private final StampedLock deviceNotificationListenersLock = new StampedLock();
private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
+ private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
+
private ListenersBroker() {
}
}
+ /**
+ * Get listener for device path.
+ *
+ * @param path name.
+ * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
+ * or {@link Optional#empty()} if listener with specified path doesn't exist.
+ */
+ public Optional<BaseListenerInterface> getDeviceNotificationListenerFor(final String path) {
+ final long stamp = deviceNotificationListenersLock.readLock();
+ try {
+ return Optional.ofNullable(deviceNotificationListeners.get(requireNonNull(path)));
+ } finally {
+ deviceNotificationListenersLock.unlockRead(stamp);
+ }
+ }
+
/**
* Get listener for stream-name.
*
}
}
+ /**
+ * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
+ * if such listener haven't been created yet.
+ *
+ * @param streamName Stream name.
+ * @param outputType Specific type of output for notifications - XML or JSON.
+ * @param refSchemaCtx Schema context of node
+ * @param mountPointService Mount point service
+ * @return Created or existing device notification listener adapter.
+ */
+ public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
+ final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
+ final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
+
+ final long stamp = deviceNotificationListenersLock.writeLock();
+ try {
+ return deviceNotificationListeners.computeIfAbsent(streamName,
+ stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
+ mountPointService, path));
+ } finally {
+ deviceNotificationListenersLock.unlockWrite(stamp);
+ }
+ }
+
/**
* Removal and closing of all data-change-event and notification listeners.
*/
}
}
+ /**
+ * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
+ * specified in parameter.
+ *
+ * @param listener Listener to be closed and removed.
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
+ final long stamp = deviceNotificationListenersLock.writeLock();
+ try {
+ requireNonNull(listener);
+ if (deviceNotificationListeners.inverse().remove(listener) == null) {
+ LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
+ }
+ } catch (final Exception exception) {
+ LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
+ } finally {
+ deviceNotificationListenersLock.unlockWrite(stamp);
+ }
+ }
+
private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
try {
requireNonNull(listener).close();
*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-import java.time.Instant;
-import java.util.Optional;
-import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects.ToStringHelper;
import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link NotificationListenerAdapter} is responsible to track events on notifications.
*/
-public final class NotificationListenerAdapter extends AbstractCommonSubscriber<Absolute, DOMNotification>
- implements DOMNotificationListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
- private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY = JSONNotificationFormatter.createFactory(
- JSONCodecFactorySupplier.RFC7951);
+public final class NotificationListenerAdapter extends AbstractNotificationListenerAdaptor {
+ private final Absolute path;
/**
* Set path of listener and stream name.
* @param outputType Type of output on notification (JSON or XML).
*/
NotificationListenerAdapter(final Absolute path, final String streamName, final NotificationOutputType outputType) {
- super(path.lastNodeIdentifier(), streamName, path, outputType, getFormatterFactory(outputType));
- }
-
- private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
- return switch (outputType) {
- case JSON -> JSON_FORMATTER_FACTORY;
- case XML -> XMLNotificationFormatter.FACTORY;
- };
+ super(path.lastNodeIdentifier(), streamName, outputType);
+ this.path = requireNonNull(path);
}
@Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void onNotification(final DOMNotification notification) {
- final Instant now = Instant.now();
- if (!checkStartStop(now)) {
- return;
- }
-
- final Optional<String> maybeOutput;
- try {
- maybeOutput = formatter().eventData(databindProvider.currentContext().modelContext(), notification, now,
- getLeafNodesOnly(), isSkipNotificationData(), getChangedLeafNodesOnly());
- } catch (Exception e) {
- LOG.error("Failed to process notification {}", notification, e);
- return;
- }
- if (maybeOutput.isPresent()) {
- post(maybeOutput.get());
- }
+ EffectiveModelContext effectiveModel() {
+ return databindProvider.currentContext().modelContext();
}
/**
* @return The configured schema path that points to observing YANG notification schema node.
*/
public Absolute getSchemaPath() {
- return path();
+ return path;
}
public synchronized void listen(final DOMNotificationService notificationService) {
setRegistration(notificationService.registerNotificationListener(this, getSchemaPath()));
}
}
+
+ @Override
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return super.addToStringAttributes(helper.add("path", path));
+ }
}
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.mdsal.dom.api.DOMRpcException;
import org.opendaylight.mdsal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.TestRestconfUtils;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
+import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
private DOMRpcService rpcService;
@Mock
private DOMMountPoint mountPoint;
+ @Mock
+ private DOMMountPointService mountPointService;
private RestconfInvokeOperationsServiceImpl invokeOperationsService;
@BeforeClass
@Before
public void setup() {
- invokeOperationsService = new RestconfInvokeOperationsServiceImpl(rpcService);
+ invokeOperationsService = new RestconfInvokeOperationsServiceImpl(rpcService, mountPointService,
+ mock(Configuration.class));
}
@Test