From 0b295aa2567bd0258ddbb6d2b88e2212e1e2c788 Mon Sep 17 00:00:00 2001 From: Nikhil Soni Date: Thu, 25 Aug 2022 17:06:59 +0530 Subject: [PATCH] Netconf Device Notification Requirement - HTTP client should be able to get Device notification after subscription. Implementation: 1. Added a new Device Notification Yang for Rest request which takes input of Device mount point path 2. Added a DeviceNotificationListenerAdaptor which is get registered in Device Notification service (on Mount point) 3. Created a dynamic Rest Notification SSE HTTP rest url (Response of 1st rest call ) 4. Updated Test-tool for Notification JIRA: NETCONF-745 Change-Id: I9cac35d0a7b0f79382d223ae4ec0ac4a3ce2bae9 Signed-off-by: Nikhil Soni Signed-off-by: nikhil.soni.lumina Signed-off-by: Robert Varga --- .../test/tool/NetconfDeviceSimulator.java | 5 + .../test/tool/config/Configuration.java | 10 +- restconf/restconf-common-models/pom.xml | 6 ++ .../main/yang/odl-device-notification.yang | 45 +++++++++ .../nb/rfc8040/RestconfApplication.java | 2 +- .../rests/services/impl/CreateStreamUtil.java | 77 ++++++++++++++- .../impl/RestconfDataStreamServiceImpl.java | 24 +++-- .../RestconfInvokeOperationsServiceImpl.java | 18 +++- .../rests/utils/RestconfStreamsConstants.java | 2 + .../listeners/AbstractCommonSubscriber.java | 23 ++--- .../AbstractNotificationListenerAdaptor.java | 65 +++++++++++++ .../DeviceNotificationListenerAdaptor.java | 96 +++++++++++++++++++ .../streams/listeners/ListenerAdapter.java | 17 +++- .../streams/listeners/ListenersBroker.java | 66 +++++++++++++ .../NotificationListenerAdapter.java | 56 ++++------- ...stconfInvokeOperationsServiceImplTest.java | 7 +- 16 files changed, 450 insertions(+), 69 deletions(-) create mode 100644 restconf/restconf-common-models/src/main/yang/odl-device-notification.yang create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerAdaptor.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java diff --git a/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/NetconfDeviceSimulator.java b/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/NetconfDeviceSimulator.java index 6f1ba7a0e7..0a92bc109c 100644 --- a/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/NetconfDeviceSimulator.java +++ b/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/NetconfDeviceSimulator.java @@ -144,6 +144,11 @@ public class NetconfDeviceSimulator implements Closeable { operationProvider = new SimulatedOperationProvider(idProvider, transformedCapabilities, Optional.ofNullable(configuration.getNotificationFile()), Optional.ofNullable(configuration.getInitialConfigXMLFile())); + } else if (configuration.isNotificationsSupported()) { + LOG.info("using SimulatedOperationProvider."); + operationProvider = new SimulatedOperationProvider(idProvider, transformedCapabilities, + Optional.ofNullable(configuration.getNotificationFile()), + Optional.empty()); } else { LOG.info("using OperationsProvider."); operationProvider = new OperationsProvider(idProvider, transformedCapabilities, diff --git a/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/config/Configuration.java b/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/config/Configuration.java index 4d9c40e702..765ec49fa8 100644 --- a/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/config/Configuration.java +++ b/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/config/Configuration.java @@ -28,7 +28,8 @@ public class Configuration { public static final Set DEFAULT_BASE_CAPABILITIES_EXI = ImmutableSet.of( XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1, - XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0 + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0, + XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_NOTIFICATION_1_0 ); public static final Set DEFAULT_BASE_CAPABILITIES = ImmutableSet.of( @@ -236,7 +237,12 @@ public class Configuration { @Deprecated public boolean isXmlConfigurationProvided() { - return initialConfigXMLFile != null && notificationFile != null; + return initialConfigXMLFile != null; + } + + @Deprecated + public boolean isNotificationsSupported() { + return notificationFile != null; } @Deprecated diff --git a/restconf/restconf-common-models/pom.xml b/restconf/restconf-common-models/pom.xml index 5a8c822e5f..2626228a1e 100644 --- a/restconf/restconf-common-models/pom.xml +++ b/restconf/restconf-common-models/pom.xml @@ -22,4 +22,10 @@ bundle ${project.artifactId} + + + org.opendaylight.mdsal.binding.model.ietf + rfc6991-ietf-inet-types + + diff --git a/restconf/restconf-common-models/src/main/yang/odl-device-notification.yang b/restconf/restconf-common-models/src/main/yang/odl-device-notification.yang new file mode 100644 index 0000000000..cf6849a99d --- /dev/null +++ b/restconf/restconf-common-models/src/main/yang/odl-device-notification.yang @@ -0,0 +1,45 @@ +module odl-device-notification { + namespace "urn:opendaylight:device:notification"; + prefix "device-notifi"; + + import ietf-inet-types { + prefix inet; + } + + organization "OpenDaylight"; + contact "Nikhil Soni "; + + description + "This module contains the definition of methods related to + device notification model + + Copyright (c)2022 Nikhil Soni and others. All rights reserved. + + This program and the accompanying materials are made available + under the terms of the Eclipse Public License v1.0 which + accompanies this distribution, and is available at + http://www.eclipse.org/legal/epl-v10.html"; + + revision "2022-11-06" { + description + "Initial revision"; + } + + rpc subscribe-device-notification { + description + "Subscribe to notifications on specified device."; + + input { + leaf path { + type instance-identifier; + description "Device mount point path"; + } + } + output { + leaf stream-path { + type inet:uri; + description "Device Notification stream URL"; + } + } + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java index 383926b319..3405f12272 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java @@ -37,7 +37,7 @@ public class RestconfApplication extends AbstractRestconfApplication { streamSubscription, new RestconfDataServiceImpl(databindProvider, dataBroker, mountPointService, streamSubscription, actionService, configuration), - new RestconfInvokeOperationsServiceImpl(rpcService), + new RestconfInvokeOperationsServiceImpl(rpcService, mountPointService, configuration), new RestconfOperationsServiceImpl(databindProvider, mountPointService), new RestconfSchemaServiceImpl(domSchemaService, mountPointService), new RestconfImpl(databindProvider))); diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java index 11bc447fad..17c22a45f9 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java @@ -10,17 +10,27 @@ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl; import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableSet; +import java.util.Collection; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMMountPoint; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMNotificationService; import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMSchemaService; import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload; import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter; import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; +import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; @@ -50,16 +60,23 @@ import org.slf4j.LoggerFactory; final class CreateStreamUtil { private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class); private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule(); + + private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule(); private static final QName DATASTORE_QNAME = QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern(); private static final QName SCOPE_QNAME = QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern(); private static final QName OUTPUT_TYPE_QNAME = QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern(); + private static final QName DEVICE_NOTIFICATION_PATH_QNAME = + QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern(); + private static final QName DEVICE_NOTIFICATION_STREAM_PATH = + QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern(); private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME); private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME); private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME); - + private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID = + NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME); private static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier( ImmutableSet.of(SCOPE_QNAME, DATASTORE_QNAME, OUTPUT_TYPE_QNAME)); @@ -123,6 +140,64 @@ final class CreateStreamUtil { .build()); } + /** + * Create device notification stream. + * + * @param baseUrl base Url + * @param payload data + * @param refSchemaCtx Reference to {@link EffectiveModelContext}. + * @param streamUtil stream utility + * @param mountPointService dom mount point service + * @return {@link DOMRpcResult} - Output of RPC - example in JSON + */ + static DOMRpcResult createDeviceNotificationListener(final String baseUrl, final NormalizedNodePayload payload, + final EffectiveModelContext refSchemaCtx, final SubscribeToStreamUtil streamUtil, + final DOMMountPointService mountPointService) { + // parsing out of container with settings and path + final ContainerNode data = (ContainerNode) requireNonNull(payload).getData(); + final YangInstanceIdentifier value = + (YangInstanceIdentifier) data.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID) + .map(DataContainerChild::body) + .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION, + ErrorTag.OPERATION_FAILED)); + final DOMMountPoint mountPoint = mountPointService.getMountPoint(value) + .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION, + ErrorTag.OPERATION_FAILED)); + + final String deviceName = extractDeviceName(value); + final NotificationOutputType outputType = prepareOutputType(data); + EffectiveModelContext effectiveModelContext = mountPoint.getService(DOMSchemaService.class).get() + .getGlobalContext(); + Collection notificationDefinitions = mountPoint.getService( + DOMSchemaService.class).get().getGlobalContext() + .getNotifications(); + if (notificationDefinitions == null || notificationDefinitions.isEmpty()) { + throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION, + ErrorTag.OPERATION_FAILED); + } + + Set absolutes = notificationDefinitions.stream() + .map(notificationDefinition -> Absolute.of(notificationDefinition.getQName())) + .collect(Collectors.toUnmodifiableSet()); + + final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance() + .registerDeviceNotificationListener(deviceName, outputType, effectiveModelContext, mountPointService, + mountPoint.getIdentifier()); + notificationListenerAdapter.listen(mountPoint.getService(DOMNotificationService.class).get(), absolutes); + + // building of output + return new DefaultDOMRpcResult(Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME)) + .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName + + "?" + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE)) + .build()); + } + + private static String extractDeviceName(final YangInstanceIdentifier iid) { + return ((YangInstanceIdentifier.NodeIdentifierWithPredicates.Singleton)iid.getLastPathArgument()) + .values().getElement().toString(); + } + /** * Prepare {@link NotificationOutputType}. * diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java index 0862e2eab2..8307f9138e 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java @@ -16,6 +16,7 @@ import javax.ws.rs.sse.SseEventSink; import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataStreamService; +import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants; import org.opendaylight.restconf.nb.rfc8040.streams.Configuration; import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface; @@ -48,14 +49,25 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService @Override public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) { final String streamName = ListenersBroker.createStreamNameFromUri(identifier); - final BaseListenerInterface listener = listenersBroker.getListenerFor(streamName) - .orElseThrow(() -> { - LOG.debug("Listener for stream with name {} was not found.", streamName); - throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING); - }); + final BaseListenerInterface listener; + final String notificaionType = + uriInfo.getQueryParameters().getFirst(RestconfStreamsConstants.NOTIFICATION_TYPE); + if (notificaionType != null && notificaionType.equals(RestconfStreamsConstants.DEVICE)) { + listener = listenersBroker.getDeviceNotificationListenerFor(streamName) + .orElseThrow(() -> { + LOG.debug("Listener for device path with name {} was not found.", streamName); + throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, + ErrorTag.DATA_MISSING); + }); + } else { + listener = listenersBroker.getListenerFor(streamName) + .orElseThrow(() -> { + LOG.debug("Listener for stream with name {} was not found.", streamName); + throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING); + }); + } LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName); - // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call // handler.init()/handler.close() final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener, diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java index fff01bb2bd..24387b0f52 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java @@ -24,6 +24,7 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriInfo; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.dom.api.DOMMountPoint; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.mdsal.dom.api.DOMRpcException; import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.mdsal.dom.api.DOMRpcService; @@ -32,6 +33,8 @@ import org.opendaylight.restconf.common.context.InstanceIdentifierContext; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload; import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfInvokeOperationsService; +import org.opendaylight.restconf.nb.rfc8040.streams.Configuration; +import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput; import org.opendaylight.yangtools.yang.common.ErrorTag; import org.opendaylight.yangtools.yang.common.ErrorType; @@ -57,11 +60,18 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat // FIXME: at some point we do not want to have this here, as this is only used for dispatch private static final QNameModule SAL_REMOTE_NAMESPACE = CreateDataChangeEventSubscriptionInput.QNAME.getModule(); + private static final QNameModule DEVICE_NOTIFICATION_NAMESPACE = SubscribeDeviceNotificationInput.QNAME.getModule(); private final DOMRpcService rpcService; + private final DOMMountPointService mountPointService; + private final SubscribeToStreamUtil streamUtils; - public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService) { + public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService, + final DOMMountPointService mountPointService, final Configuration configuration) { this.rpcService = requireNonNull(rpcService); + this.mountPointService = requireNonNull(mountPointService); + streamUtils = configuration.isUseSSE() ? SubscribeToStreamUtil.serverSentEvents() + : SubscribeToStreamUtil.webSockets(); } @Override @@ -85,6 +95,12 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat future = Futures.immediateFailedFuture(new RestconfDocumentedException("Unsupported operation", ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED)); } + } else if (DEVICE_NOTIFICATION_NAMESPACE.equals(rpcName.getModule())) { + // FIXME: this should be a match on RPC QName + final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString(); + future = Futures.immediateFuture( + CreateStreamUtil.createDeviceNotificationListener(baseUrl, payload, schemaContext, streamUtils, + mountPointService)); } else { future = invokeRpc((ContainerNode)payload.getData(), rpcName, rpcService); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java index 1a08585272..26911539a9 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java @@ -22,6 +22,8 @@ public final class RestconfStreamsConstants { public static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART; public static final String STREAM_ACCESS_PATH_PART = "/access="; public static final String STREAM_LOCATION_PATH_PART = "/location"; + public static final String NOTIFICATION_TYPE = "notificationType"; + public static final String DEVICE = "device"; private RestconfStreamsConstants() { // Hidden on purpose diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java index 82e7a6219d..158511c3d1 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java @@ -12,6 +12,7 @@ import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; import java.time.Instant; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory; /** * Features of subscribing part of both notifications. */ -abstract class AbstractCommonSubscriber extends AbstractNotificationsData implements BaseListenerInterface { +abstract class AbstractCommonSubscriber extends AbstractNotificationsData implements BaseListenerInterface { private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class); private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder() .appendValue(ChronoField.YEAR, 4).appendLiteral('-') @@ -54,7 +55,6 @@ abstract class AbstractCommonSubscriber extends AbstractNotificationsData private final EventFormatterFactory formatterFactory; private final NotificationOutputType outputType; private final String streamName; - private final P path; @GuardedBy("this") private final Set subscribers = new HashSet<>(); @@ -69,12 +69,11 @@ abstract class AbstractCommonSubscriber extends AbstractNotificationsData private boolean changedLeafNodesOnly = false; private EventFormatter formatter; - AbstractCommonSubscriber(final QName lastQName, final String streamName, final P path, - final NotificationOutputType outputType, final EventFormatterFactory formatterFactory) { + AbstractCommonSubscriber(final QName lastQName, final String streamName, final NotificationOutputType outputType, + final EventFormatterFactory formatterFactory) { super(lastQName); this.streamName = requireNonNull(streamName); checkArgument(!streamName.isEmpty()); - this.path = requireNonNull(path); this.outputType = requireNonNull(outputType); this.formatterFactory = requireNonNull(formatterFactory); @@ -168,10 +167,6 @@ abstract class AbstractCommonSubscriber extends AbstractNotificationsData } } - final P path() { - return path; - } - /** * Check whether this query should only notify about leaf node changes. * @@ -271,11 +266,11 @@ abstract class AbstractCommonSubscriber extends AbstractNotificationsData @Override public final String toString() { - return MoreObjects.toStringHelper(this) - .add("path", path) - .add("stream-name", streamName) - .add("output-type", getOutputType()) - .toString(); + return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); + } + + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return helper.add("stream-name", streamName).add("output-type", getOutputType()); } /** diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerAdaptor.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerAdaptor.java new file mode 100644 index 0000000000..08132ea683 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerAdaptor.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040.streams.listeners; + +import java.time.Instant; +import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.dom.api.DOMEvent; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.mdsal.dom.api.DOMNotificationListener; +import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for functionality shared between {@link NotificationListenerAdapter} and + * {@link DeviceNotificationListenerAdaptor}. + */ +abstract class AbstractNotificationListenerAdaptor extends AbstractCommonSubscriber + implements DOMNotificationListener { + private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class); + private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY = + JSONNotificationFormatter.createFactory(JSONCodecFactorySupplier.RFC7951); + + AbstractNotificationListenerAdaptor(final QName lastQName, final String streamName, + final NotificationOutputType outputType) { + super(lastQName, streamName, outputType, getFormatterFactory(outputType)); + } + + private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) { + return switch (outputType) { + case JSON -> JSON_FORMATTER_FACTORY; + case XML -> XMLNotificationFormatter.FACTORY; + }; + } + + @Override + @SuppressWarnings("checkstyle:IllegalCatch") + public final void onNotification(final DOMNotification notification) { + final var eventInstant = notification instanceof DOMEvent domEvent ? domEvent.getEventInstant() : Instant.now(); + if (!checkStartStop(eventInstant)) { + return; + } + + final Optional maybeOutput; + try { + maybeOutput = formatter().eventData(effectiveModel(), notification, eventInstant, getLeafNodesOnly(), + isSkipNotificationData(), getChangedLeafNodesOnly()); + } catch (Exception e) { + LOG.error("Failed to process notification {}", notification, e); + return; + } + maybeOutput.ifPresent(this::post); + } + + abstract @NonNull EffectiveModelContext effectiveModel(); +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java new file mode 100644 index 0000000000..3922c0ba4f --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2022 Opendaylight, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040.streams.listeners; + +import static java.util.Objects.requireNonNull; + +import java.util.Set; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.dom.api.DOMMountPointListener; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler; +import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; +import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link DeviceNotificationListenerAdaptor} is responsible to track events on notifications. + */ +public final class DeviceNotificationListenerAdaptor extends AbstractNotificationListenerAdaptor implements + DOMMountPointListener { + + private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationListenerAdaptor.class); + private final @NonNull EffectiveModelContext effectiveModel; + + private final @NonNull DOMMountPointService mountPointService; + + private ListenerRegistration reg; + private final YangInstanceIdentifier instanceIdentifier; + + + public DeviceNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType, + final EffectiveModelContext effectiveModel, final DOMMountPointService mountPointService, + final YangInstanceIdentifier path) { + // FIXME: a dummy QName due to contracts + super(QName.create("dummy", "dummy"), streamName, outputType); + this.effectiveModel = requireNonNull(effectiveModel); + this.mountPointService = requireNonNull(mountPointService); + instanceIdentifier = requireNonNull(path); + } + + public synchronized void listen(final DOMNotificationService notificationService, final Set paths) { + if (!isListening()) { + setRegistration(notificationService.registerNotificationListener(this, paths)); + reg = mountPointService.registerProvisionListener(this); + } + } + + private synchronized void resetListenerRegistration() { + if (reg != null) { + reg.close(); + reg = null; + } + } + + @Override + EffectiveModelContext effectiveModel() { + return effectiveModel; + } + + @Override + public void onMountPointCreated(final YangInstanceIdentifier path) { + // No-op + } + + @Override + public void onMountPointRemoved(final YangInstanceIdentifier path) { + if (instanceIdentifier.equals(path)) { + getSubscribers().forEach(subscriber -> { + if (subscriber.isConnected()) { + subscriber.sendDataMessage("Device disconnected"); + } + if (subscriber instanceof SSESessionHandler sseSessionHandler) { + try { + sseSessionHandler.close(); + } catch (IllegalStateException e) { + LOG.warn("Ignoring exception while closing sse session"); + } + } + }); + ListenersBroker.getInstance().removeAndCloseDeviceNotificationListener(this); + resetListenerRegistration(); + } + } + +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java index 700b1205d6..a6e3306f44 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java @@ -7,7 +7,10 @@ */ package org.opendaylight.restconf.nb.rfc8040.streams.listeners; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects.ToStringHelper; import java.time.Instant; import java.util.Collection; import java.util.List; @@ -28,12 +31,14 @@ import org.slf4j.LoggerFactory; /** * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source. */ -public class ListenerAdapter extends AbstractCommonSubscriber> +public class ListenerAdapter extends AbstractCommonSubscriber> implements ClusteredDOMDataTreeChangeListener { private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class); private static final DataTreeCandidateFormatterFactory JSON_FORMATTER_FACTORY = JSONDataTreeCandidateFormatter.createFactory(JSONCodecFactorySupplier.RFC7951); + private final YangInstanceIdentifier path; + /** * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing. * @@ -44,7 +49,8 @@ public class ListenerAdapter extends AbstractCommonSubscriber dataChangeListeners = HashBiMap.create(); private final BiMap notificationListeners = HashBiMap.create(); + private final BiMap deviceNotificationListeners = HashBiMap.create(); + private ListenersBroker() { @@ -115,6 +120,22 @@ public final class ListenersBroker { } } + /** + * Get listener for device path. + * + * @param path name. + * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional} + * or {@link Optional#empty()} if listener with specified path doesn't exist. + */ + public Optional getDeviceNotificationListenerFor(final String path) { + final long stamp = deviceNotificationListenersLock.readLock(); + try { + return Optional.ofNullable(deviceNotificationListeners.get(requireNonNull(path))); + } finally { + deviceNotificationListenersLock.unlockRead(stamp); + } + } + /** * Get listener for stream-name. * @@ -180,6 +201,30 @@ public final class ListenersBroker { } } + /** + * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path + * if such listener haven't been created yet. + * + * @param streamName Stream name. + * @param outputType Specific type of output for notifications - XML or JSON. + * @param refSchemaCtx Schema context of node + * @param mountPointService Mount point service + * @return Created or existing device notification listener adapter. + */ + public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName, + final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx, + final DOMMountPointService mountPointService, final YangInstanceIdentifier path) { + + final long stamp = deviceNotificationListenersLock.writeLock(); + try { + return deviceNotificationListeners.computeIfAbsent(streamName, + stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx, + mountPointService, path)); + } finally { + deviceNotificationListenersLock.unlockWrite(stamp); + } + } + /** * Removal and closing of all data-change-event and notification listeners. */ @@ -302,6 +347,27 @@ public final class ListenersBroker { } } + /** + * Removes and closes device notification listener of type {@link NotificationListenerAdapter} + * specified in parameter. + * + * @param listener Listener to be closed and removed. + */ + @SuppressWarnings("checkstyle:IllegalCatch") + public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) { + final long stamp = deviceNotificationListenersLock.writeLock(); + try { + requireNonNull(listener); + if (deviceNotificationListeners.inverse().remove(listener) == null) { + LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener); + } + } catch (final Exception exception) { + LOG.error("Device Notification listener {} cannot be closed.", listener, exception); + } finally { + deviceNotificationListenersLock.unlockWrite(stamp); + } + } + private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) { try { requireNonNull(listener).close(); diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java index f954da4da3..f0fa51af76 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java @@ -7,26 +7,19 @@ */ package org.opendaylight.restconf.nb.rfc8040.streams.listeners; -import java.time.Instant; -import java.util.Optional; -import org.opendaylight.mdsal.dom.api.DOMNotification; -import org.opendaylight.mdsal.dom.api.DOMNotificationListener; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects.ToStringHelper; import org.opendaylight.mdsal.dom.api.DOMNotificationService; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; -import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@link NotificationListenerAdapter} is responsible to track events on notifications. */ -public final class NotificationListenerAdapter extends AbstractCommonSubscriber - implements DOMNotificationListener { - - private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class); - private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY = JSONNotificationFormatter.createFactory( - JSONCodecFactorySupplier.RFC7951); +public final class NotificationListenerAdapter extends AbstractNotificationListenerAdaptor { + private final Absolute path; /** * Set path of listener and stream name. @@ -36,35 +29,13 @@ public final class NotificationListenerAdapter extends AbstractCommonSubscriber< * @param outputType Type of output on notification (JSON or XML). */ NotificationListenerAdapter(final Absolute path, final String streamName, final NotificationOutputType outputType) { - super(path.lastNodeIdentifier(), streamName, path, outputType, getFormatterFactory(outputType)); - } - - private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) { - return switch (outputType) { - case JSON -> JSON_FORMATTER_FACTORY; - case XML -> XMLNotificationFormatter.FACTORY; - }; + super(path.lastNodeIdentifier(), streamName, outputType); + this.path = requireNonNull(path); } @Override - @SuppressWarnings("checkstyle:IllegalCatch") - public void onNotification(final DOMNotification notification) { - final Instant now = Instant.now(); - if (!checkStartStop(now)) { - return; - } - - final Optional maybeOutput; - try { - maybeOutput = formatter().eventData(databindProvider.currentContext().modelContext(), notification, now, - getLeafNodesOnly(), isSkipNotificationData(), getChangedLeafNodesOnly()); - } catch (Exception e) { - LOG.error("Failed to process notification {}", notification, e); - return; - } - if (maybeOutput.isPresent()) { - post(maybeOutput.get()); - } + EffectiveModelContext effectiveModel() { + return databindProvider.currentContext().modelContext(); } /** @@ -73,7 +44,7 @@ public final class NotificationListenerAdapter extends AbstractCommonSubscriber< * @return The configured schema path that points to observing YANG notification schema node. */ public Absolute getSchemaPath() { - return path(); + return path; } public synchronized void listen(final DOMNotificationService notificationService) { @@ -81,4 +52,9 @@ public final class NotificationListenerAdapter extends AbstractCommonSubscriber< setRegistration(notificationService.registerNotificationListener(this, getSchemaPath())); } } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return super.addToStringAttributes(helper.add("path", path)); + } } diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java index 1bc05d629e..97e0ceecf9 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java @@ -36,6 +36,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.opendaylight.mdsal.dom.api.DOMMountPoint; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.mdsal.dom.api.DOMRpcException; import org.opendaylight.mdsal.dom.api.DOMRpcImplementationNotAvailableException; import org.opendaylight.mdsal.dom.api.DOMRpcResult; @@ -45,6 +46,7 @@ import org.opendaylight.restconf.common.context.InstanceIdentifierContext; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.nb.rfc8040.TestRestconfUtils; import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload; +import org.opendaylight.restconf.nb.rfc8040.streams.Configuration; import org.opendaylight.yangtools.yang.common.ErrorTag; import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.QName; @@ -75,6 +77,8 @@ public class RestconfInvokeOperationsServiceImplTest { private DOMRpcService rpcService; @Mock private DOMMountPoint mountPoint; + @Mock + private DOMMountPointService mountPointService; private RestconfInvokeOperationsServiceImpl invokeOperationsService; @BeforeClass @@ -84,7 +88,8 @@ public class RestconfInvokeOperationsServiceImplTest { @Before public void setup() { - invokeOperationsService = new RestconfInvokeOperationsServiceImpl(rpcService); + invokeOperationsService = new RestconfInvokeOperationsServiceImpl(rpcService, mountPointService, + mock(Configuration.class)); } @Test -- 2.36.6