Netconf Device Notification 67/93967/48
authorNikhil Soni <nikk.sonitech@gmail.com>
Thu, 25 Aug 2022 11:36:59 +0000 (17:06 +0530)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 10 Jan 2023 12:23:40 +0000 (13:23 +0100)
Requirement - HTTP client should be able to get Device notification
after subscription.

Implementation:
1. Added a new Device Notification Yang for Rest request which takes
 input of Device mount point path
2. Added a DeviceNotificationListenerAdaptor which is get registered
in Device Notification service (on Mount point)
3. Created a dynamic Rest Notification SSE HTTP rest url (Response of 1st rest
call )
4. Updated Test-tool for Notification

JIRA: NETCONF-745
Change-Id: I9cac35d0a7b0f79382d223ae4ec0ac4a3ce2bae9
Signed-off-by: Nikhil Soni <nikk.sonitech@gmail.com>
Signed-off-by: nikhil.soni.lumina <nikk.sonitech@gmail.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
16 files changed:
netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/NetconfDeviceSimulator.java
netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/config/Configuration.java
restconf/restconf-common-models/pom.xml
restconf/restconf-common-models/src/main/yang/odl-device-notification.yang [new file with mode: 0644]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerAdaptor.java [new file with mode: 0644]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java [new file with mode: 0644]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java

index 6f1ba7a0e762e53b5a2dae0066f4a98064147ea5..0a92bc109ceee289be25725c20782bd7c35283a7 100644 (file)
@@ -144,6 +144,11 @@ public class NetconfDeviceSimulator implements Closeable {
             operationProvider = new SimulatedOperationProvider(idProvider, transformedCapabilities,
                     Optional.ofNullable(configuration.getNotificationFile()),
                     Optional.ofNullable(configuration.getInitialConfigXMLFile()));
+        } else if (configuration.isNotificationsSupported()) {
+            LOG.info("using SimulatedOperationProvider.");
+            operationProvider = new SimulatedOperationProvider(idProvider, transformedCapabilities,
+                    Optional.ofNullable(configuration.getNotificationFile()),
+                    Optional.empty());
         } else {
             LOG.info("using OperationsProvider.");
             operationProvider = new OperationsProvider(idProvider, transformedCapabilities,
index 4d9c40e70298d7a35b4dc7ba9c4ccdeac53760da..765ec49fa83450160f6c3124bd741b1ae57347d6 100644 (file)
@@ -28,7 +28,8 @@ public class Configuration {
     public static final Set<String> DEFAULT_BASE_CAPABILITIES_EXI = ImmutableSet.of(
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
             XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
-            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0,
+            XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_NOTIFICATION_1_0
     );
 
     public static final Set<String> DEFAULT_BASE_CAPABILITIES = ImmutableSet.of(
@@ -236,7 +237,12 @@ public class Configuration {
 
     @Deprecated
     public boolean isXmlConfigurationProvided() {
-        return initialConfigXMLFile != null && notificationFile != null;
+        return initialConfigXMLFile != null;
+    }
+
+    @Deprecated
+    public boolean isNotificationsSupported() {
+        return notificationFile != null;
     }
 
     @Deprecated
index 5a8c822e5fedd68a547e19ade1b78ddf9dac63f3..2626228a1e850bcfb5fe693f744add8394171deb 100644 (file)
   <packaging>bundle</packaging>
   <name>${project.artifactId}</name>
 
+  <dependencies>
+     <dependency>
+       <groupId>org.opendaylight.mdsal.binding.model.ietf</groupId>
+       <artifactId>rfc6991-ietf-inet-types</artifactId>
+     </dependency>
+  </dependencies>
 </project>
diff --git a/restconf/restconf-common-models/src/main/yang/odl-device-notification.yang b/restconf/restconf-common-models/src/main/yang/odl-device-notification.yang
new file mode 100644 (file)
index 0000000..cf6849a
--- /dev/null
@@ -0,0 +1,45 @@
+module odl-device-notification {
+  namespace "urn:opendaylight:device:notification";
+  prefix "device-notifi";
+
+  import ietf-inet-types {
+    prefix inet;
+  }
+
+  organization "OpenDaylight";
+  contact "Nikhil Soni <nikk.sonitech@gmail.com>";
+
+  description
+    "This module contains the definition of methods related to
+    device notification model
+
+     Copyright (c)2022 Nikhil Soni and others. All rights reserved.
+
+     This program and the accompanying materials are made available
+     under the terms of the Eclipse Public License v1.0 which
+     accompanies this distribution, and is available at
+     http://www.eclipse.org/legal/epl-v10.html";
+
+  revision "2022-11-06" {
+    description
+      "Initial revision";
+  }
+
+  rpc subscribe-device-notification {
+    description
+      "Subscribe to notifications on specified device.";
+
+    input {
+        leaf path {
+          type instance-identifier;
+          description "Device mount point path";
+        }
+     }
+     output {
+        leaf stream-path {
+          type inet:uri;
+          description "Device Notification stream URL";
+        }
+     }
+  }
+}
index 383926b31967d6814ee2613babf977e31372a259..3405f1227232d5d1333745bfb317eea95f71f7f2 100644 (file)
@@ -37,7 +37,7 @@ public class RestconfApplication extends AbstractRestconfApplication {
             streamSubscription,
             new RestconfDataServiceImpl(databindProvider, dataBroker, mountPointService, streamSubscription,
                 actionService, configuration),
-            new RestconfInvokeOperationsServiceImpl(rpcService),
+            new RestconfInvokeOperationsServiceImpl(rpcService, mountPointService, configuration),
             new RestconfOperationsServiceImpl(databindProvider, mountPointService),
             new RestconfSchemaServiceImpl(domSchemaService, mountPointService),
             new RestconfImpl(databindProvider)));
index 11bc447fada389b566520554ebed7ca8a72bf777..17c22a45f9468157da050b6bcfb29a0ae2402fad 100644 (file)
@@ -10,17 +10,27 @@ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
@@ -50,16 +60,23 @@ import org.slf4j.LoggerFactory;
 final class CreateStreamUtil {
     private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
     private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
+
+    private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
     private static final QName DATASTORE_QNAME =
         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
     private static final QName SCOPE_QNAME =
         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
     private static final QName OUTPUT_TYPE_QNAME =
         QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
+    private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
+        QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
+    private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
+        QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
     private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
-
+    private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
+        NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
     private static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
         ImmutableSet.of(SCOPE_QNAME, DATASTORE_QNAME, OUTPUT_TYPE_QNAME));
 
@@ -123,6 +140,64 @@ final class CreateStreamUtil {
             .build());
     }
 
+    /**
+     * Create device notification stream.
+     *
+     * @param baseUrl base Url
+     * @param payload data
+     * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
+     * @param streamUtil stream utility
+     * @param mountPointService dom mount point service
+     * @return {@link DOMRpcResult} - Output of RPC - example in JSON
+     */
+    static DOMRpcResult createDeviceNotificationListener(final String baseUrl, final NormalizedNodePayload payload,
+            final EffectiveModelContext refSchemaCtx, final SubscribeToStreamUtil streamUtil,
+            final DOMMountPointService mountPointService) {
+        // parsing out of container with settings and path
+        final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
+        final YangInstanceIdentifier value =
+            (YangInstanceIdentifier) data.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
+            .map(DataContainerChild::body)
+            .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+                ErrorTag.OPERATION_FAILED));
+        final DOMMountPoint mountPoint = mountPointService.getMountPoint(value)
+            .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+                ErrorTag.OPERATION_FAILED));
+
+        final String deviceName = extractDeviceName(value);
+        final NotificationOutputType outputType = prepareOutputType(data);
+        EffectiveModelContext effectiveModelContext = mountPoint.getService(DOMSchemaService.class).get()
+            .getGlobalContext();
+        Collection<? extends NotificationDefinition> notificationDefinitions = mountPoint.getService(
+                DOMSchemaService.class).get().getGlobalContext()
+            .getNotifications();
+        if (notificationDefinitions == null || notificationDefinitions.isEmpty()) {
+            throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
+                ErrorTag.OPERATION_FAILED);
+        }
+
+        Set<Absolute> absolutes = notificationDefinitions.stream()
+            .map(notificationDefinition -> Absolute.of(notificationDefinition.getQName()))
+            .collect(Collectors.toUnmodifiableSet());
+
+        final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
+            .registerDeviceNotificationListener(deviceName, outputType, effectiveModelContext, mountPointService,
+                mountPoint.getIdentifier());
+        notificationListenerAdapter.listen(mountPoint.getService(DOMNotificationService.class).get(), absolutes);
+
+        // building of output
+        return new DefaultDOMRpcResult(Builders.containerBuilder()
+            .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
+            .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName
+                + "?" + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
+            .build());
+    }
+
+    private static String extractDeviceName(final YangInstanceIdentifier iid) {
+        return ((YangInstanceIdentifier.NodeIdentifierWithPredicates.Singleton)iid.getLastPathArgument())
+            .values().getElement().toString();
+    }
+
     /**
      * Prepare {@link NotificationOutputType}.
      *
index 0862e2eab2d87ce3c1d999c8b01dcf59b0717470..8307f9138e2d504c9380e6df385caca9e8d1397b 100644 (file)
@@ -16,6 +16,7 @@ import javax.ws.rs.sse.SseEventSink;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataStreamService;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
 import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
@@ -48,14 +49,25 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService
     @Override
     public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) {
         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
-        final BaseListenerInterface listener = listenersBroker.getListenerFor(streamName)
-            .orElseThrow(() -> {
-                LOG.debug("Listener for stream with name {} was not found.", streamName);
-                throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
-            });
+        final BaseListenerInterface listener;
+        final String notificaionType =
+            uriInfo.getQueryParameters().getFirst(RestconfStreamsConstants.NOTIFICATION_TYPE);
+        if (notificaionType != null && notificaionType.equals(RestconfStreamsConstants.DEVICE)) {
+            listener = listenersBroker.getDeviceNotificationListenerFor(streamName)
+                .orElseThrow(() -> {
+                    LOG.debug("Listener for device path with name {} was not found.", streamName);
+                    throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION,
+                        ErrorTag.DATA_MISSING);
+                });
+        } else {
+            listener = listenersBroker.getListenerFor(streamName)
+                .orElseThrow(() -> {
+                    LOG.debug("Listener for stream with name {} was not found.", streamName);
+                    throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+                });
+        }
 
         LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
-
         // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
         //        handler.init()/handler.close()
         final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener,
index fff01bb2bddd7ce7458185dd6bc1856f411c46f4..24387b0f52473c967ca88327dc0279c4921dcbeb 100644 (file)
@@ -24,6 +24,7 @@ import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriInfo;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.mdsal.dom.api.DOMRpcException;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.mdsal.dom.api.DOMRpcService;
@@ -32,6 +33,8 @@ import org.opendaylight.restconf.common.context.InstanceIdentifierContext;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfInvokeOperationsService;
+import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
@@ -57,11 +60,18 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
 
     // FIXME: at some point we do not want to have this here, as this is only used for dispatch
     private static final QNameModule SAL_REMOTE_NAMESPACE = CreateDataChangeEventSubscriptionInput.QNAME.getModule();
+    private static final QNameModule DEVICE_NOTIFICATION_NAMESPACE = SubscribeDeviceNotificationInput.QNAME.getModule();
 
     private final DOMRpcService rpcService;
+    private final DOMMountPointService mountPointService;
+    private final SubscribeToStreamUtil streamUtils;
 
-    public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService) {
+    public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService,
+            final DOMMountPointService mountPointService, final Configuration configuration) {
         this.rpcService = requireNonNull(rpcService);
+        this.mountPointService = requireNonNull(mountPointService);
+        streamUtils = configuration.isUseSSE() ? SubscribeToStreamUtil.serverSentEvents()
+            : SubscribeToStreamUtil.webSockets();
     }
 
     @Override
@@ -85,6 +95,12 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
                     future = Futures.immediateFailedFuture(new RestconfDocumentedException("Unsupported operation",
                         ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED));
                 }
+            } else if (DEVICE_NOTIFICATION_NAMESPACE.equals(rpcName.getModule())) {
+                // FIXME: this should be a match on RPC QName
+                final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
+                future = Futures.immediateFuture(
+                    CreateStreamUtil.createDeviceNotificationListener(baseUrl, payload, schemaContext, streamUtils,
+                        mountPointService));
             } else {
                 future = invokeRpc((ContainerNode)payload.getData(), rpcName, rpcService);
             }
index 1a08585272b623b2f9f373f0904ec765c25c54dd..26911539a926e9ea09ded7376c319311f64e2fc7 100644 (file)
@@ -22,6 +22,8 @@ public final class RestconfStreamsConstants {
     public static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
     public static final String STREAM_ACCESS_PATH_PART = "/access=";
     public static final String STREAM_LOCATION_PATH_PART = "/location";
+    public static final String NOTIFICATION_TYPE = "notificationType";
+    public static final String DEVICE = "device";
 
     private RestconfStreamsConstants() {
         // Hidden on purpose
index 82e7a6219dd32bf4196d0f8c5d91787a0110ae51..158511c3d18dd84d159e41888cebbd23636aef2f 100644 (file)
@@ -12,6 +12,7 @@ import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
 import java.time.Instant;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Features of subscribing part of both notifications.
  */
-abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData implements BaseListenerInterface {
+abstract class AbstractCommonSubscriber<T> extends AbstractNotificationsData implements BaseListenerInterface {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
     private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder()
         .appendValue(ChronoField.YEAR, 4).appendLiteral('-')
@@ -54,7 +55,6 @@ abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData
     private final EventFormatterFactory<T> formatterFactory;
     private final NotificationOutputType outputType;
     private final String streamName;
-    private final P path;
 
     @GuardedBy("this")
     private final Set<StreamSessionHandler> subscribers = new HashSet<>();
@@ -69,12 +69,11 @@ abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData
     private boolean changedLeafNodesOnly = false;
     private EventFormatter<T> formatter;
 
-    AbstractCommonSubscriber(final QName lastQName, final String streamName, final P path,
-            final NotificationOutputType outputType, final EventFormatterFactory<T> formatterFactory) {
+    AbstractCommonSubscriber(final QName lastQName, final String streamName, final NotificationOutputType outputType,
+            final EventFormatterFactory<T> formatterFactory) {
         super(lastQName);
         this.streamName = requireNonNull(streamName);
         checkArgument(!streamName.isEmpty());
-        this.path = requireNonNull(path);
 
         this.outputType = requireNonNull(outputType);
         this.formatterFactory = requireNonNull(formatterFactory);
@@ -168,10 +167,6 @@ abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData
         }
     }
 
-    final P path() {
-        return path;
-    }
-
     /**
      * Check whether this query should only notify about leaf node changes.
      *
@@ -271,11 +266,11 @@ abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData
 
     @Override
     public final String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("path", path)
-            .add("stream-name", streamName)
-            .add("output-type", getOutputType())
-            .toString();
+        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+    }
+
+    ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+        return helper.add("stream-name", streamName).add("output-type", getOutputType());
     }
 
     /**
diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerAdaptor.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerAdaptor.java
new file mode 100644 (file)
index 0000000..08132ea
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+
+import java.time.Instant;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.api.DOMEvent;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for functionality shared between {@link NotificationListenerAdapter} and
+ * {@link DeviceNotificationListenerAdaptor}.
+ */
+abstract class AbstractNotificationListenerAdaptor extends AbstractCommonSubscriber<DOMNotification>
+        implements DOMNotificationListener {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class);
+    private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY =
+        JSONNotificationFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
+
+    AbstractNotificationListenerAdaptor(final QName lastQName, final String streamName,
+            final NotificationOutputType outputType) {
+        super(lastQName, streamName, outputType, getFormatterFactory(outputType));
+    }
+
+    private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
+        return switch (outputType) {
+            case JSON -> JSON_FORMATTER_FACTORY;
+            case XML -> XMLNotificationFormatter.FACTORY;
+        };
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public final void onNotification(final DOMNotification notification) {
+        final var eventInstant = notification instanceof DOMEvent domEvent ? domEvent.getEventInstant() : Instant.now();
+        if (!checkStartStop(eventInstant)) {
+            return;
+        }
+
+        final Optional<String> maybeOutput;
+        try {
+            maybeOutput = formatter().eventData(effectiveModel(), notification, eventInstant, getLeafNodesOnly(),
+                isSkipNotificationData(), getChangedLeafNodesOnly());
+        } catch (Exception e) {
+            LOG.error("Failed to process notification {}", notification, e);
+            return;
+        }
+        maybeOutput.ifPresent(this::post);
+    }
+
+    abstract @NonNull EffectiveModelContext effectiveModel();
+}
diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java
new file mode 100644 (file)
index 0000000..3922c0b
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2022 Opendaylight, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Set;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
+import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DeviceNotificationListenerAdaptor} is responsible to track events on notifications.
+ */
+public final class DeviceNotificationListenerAdaptor extends AbstractNotificationListenerAdaptor implements
+    DOMMountPointListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationListenerAdaptor.class);
+    private final @NonNull EffectiveModelContext effectiveModel;
+
+    private  final @NonNull DOMMountPointService mountPointService;
+
+    private ListenerRegistration<DOMMountPointListener> reg;
+    private final YangInstanceIdentifier instanceIdentifier;
+
+
+    public DeviceNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType,
+            final EffectiveModelContext effectiveModel, final DOMMountPointService mountPointService,
+            final YangInstanceIdentifier path) {
+        // FIXME: a dummy QName due to contracts
+        super(QName.create("dummy", "dummy"), streamName, outputType);
+        this.effectiveModel = requireNonNull(effectiveModel);
+        this.mountPointService = requireNonNull(mountPointService);
+        instanceIdentifier = requireNonNull(path);
+    }
+
+    public synchronized void listen(final DOMNotificationService notificationService, final Set<Absolute> paths) {
+        if (!isListening()) {
+            setRegistration(notificationService.registerNotificationListener(this, paths));
+            reg = mountPointService.registerProvisionListener(this);
+        }
+    }
+
+    private synchronized void resetListenerRegistration() {
+        if (reg != null) {
+            reg.close();
+            reg = null;
+        }
+    }
+
+    @Override
+    EffectiveModelContext effectiveModel() {
+        return effectiveModel;
+    }
+
+    @Override
+    public void onMountPointCreated(final YangInstanceIdentifier path) {
+        // No-op
+    }
+
+    @Override
+    public void onMountPointRemoved(final YangInstanceIdentifier path) {
+        if (instanceIdentifier.equals(path)) {
+            getSubscribers().forEach(subscriber -> {
+                if (subscriber.isConnected()) {
+                    subscriber.sendDataMessage("Device disconnected");
+                }
+                if (subscriber instanceof SSESessionHandler sseSessionHandler) {
+                    try {
+                        sseSessionHandler.close();
+                    } catch (IllegalStateException e) {
+                        LOG.warn("Ignoring exception while closing sse session");
+                    }
+                }
+            });
+            ListenersBroker.getInstance().removeAndCloseDeviceNotificationListener(this);
+            resetListenerRegistration();
+        }
+    }
+
+}
index 700b1205d64d9d291a5d98f151ddb6aa8b883cc8..a6e3306f444f3cfb2d76ff7adc29ace9312e3518 100644 (file)
@@ -7,7 +7,10 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects.ToStringHelper;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.List;
@@ -28,12 +31,14 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
  */
-public class ListenerAdapter extends AbstractCommonSubscriber<YangInstanceIdentifier, Collection<DataTreeCandidate>>
+public class ListenerAdapter extends AbstractCommonSubscriber<Collection<DataTreeCandidate>>
         implements ClusteredDOMDataTreeChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
     private static final DataTreeCandidateFormatterFactory JSON_FORMATTER_FACTORY =
             JSONDataTreeCandidateFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
 
+    private final YangInstanceIdentifier path;
+
     /**
      * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
      *
@@ -44,7 +49,8 @@ public class ListenerAdapter extends AbstractCommonSubscriber<YangInstanceIdenti
     @VisibleForTesting
     public ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
             final NotificationOutputType outputType) {
-        super(path.getLastPathArgument().getNodeType(), streamName, path, outputType, getFormatterFactory(outputType));
+        super(path.getLastPathArgument().getNodeType(), streamName, outputType, getFormatterFactory(outputType));
+        this.path = requireNonNull(path);
     }
 
     private static DataTreeCandidateFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
@@ -88,7 +94,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber<YangInstanceIdenti
      * @return Path pointed to data in data store.
      */
     public YangInstanceIdentifier getPath() {
-        return path();
+        return path;
     }
 
     /**
@@ -109,4 +115,9 @@ public class ListenerAdapter extends AbstractCommonSubscriber<YangInstanceIdenti
                 new DOMDataTreeIdentifier(datastore, getPath()), this));
         }
     }
+
+    @Override
+    ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+        return super.addToStringAttributes(helper.add("path", path));
+    }
 }
index 3b83c9022512c9e3817eb4a8ecafb17a35e6aa5b..d8f2bd54489f00fb988c69337072bc3f5246c078 100644 (file)
@@ -19,10 +19,12 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.StampedLock;
 import java.util.function.Function;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 import org.slf4j.Logger;
@@ -43,8 +45,11 @@ public final class ListenersBroker {
 
     private final StampedLock dataChangeListenersLock = new StampedLock();
     private final StampedLock notificationListenersLock = new StampedLock();
+    private final StampedLock deviceNotificationListenersLock = new StampedLock();
     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
+    private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
+
 
     private ListenersBroker() {
 
@@ -115,6 +120,22 @@ public final class ListenersBroker {
         }
     }
 
+    /**
+     * Get listener for device path.
+     *
+     * @param path name.
+     * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
+     *     or {@link Optional#empty()} if listener with specified path doesn't exist.
+     */
+    public Optional<BaseListenerInterface> getDeviceNotificationListenerFor(final String path) {
+        final long stamp = deviceNotificationListenersLock.readLock();
+        try {
+            return Optional.ofNullable(deviceNotificationListeners.get(requireNonNull(path)));
+        } finally {
+            deviceNotificationListenersLock.unlockRead(stamp);
+        }
+    }
+
     /**
      * Get listener for stream-name.
      *
@@ -180,6 +201,30 @@ public final class ListenersBroker {
         }
     }
 
+    /**
+     * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
+     * if such listener haven't been created yet.
+     *
+     * @param streamName Stream name.
+     * @param outputType Specific type of output for notifications - XML or JSON.
+     * @param refSchemaCtx Schema context of node
+     * @param mountPointService Mount point service
+     * @return Created or existing device notification listener adapter.
+     */
+    public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
+        final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
+        final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
+
+        final long stamp = deviceNotificationListenersLock.writeLock();
+        try {
+            return deviceNotificationListeners.computeIfAbsent(streamName,
+                stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
+                    mountPointService, path));
+        } finally {
+            deviceNotificationListenersLock.unlockWrite(stamp);
+        }
+    }
+
     /**
      * Removal and closing of all data-change-event and notification listeners.
      */
@@ -302,6 +347,27 @@ public final class ListenersBroker {
         }
     }
 
+    /**
+     * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
+     * specified in parameter.
+     *
+     * @param listener Listener to be closed and removed.
+     */
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
+        final long stamp = deviceNotificationListenersLock.writeLock();
+        try {
+            requireNonNull(listener);
+            if (deviceNotificationListeners.inverse().remove(listener) == null) {
+                LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
+            }
+        } catch (final Exception exception) {
+            LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
+        } finally {
+            deviceNotificationListenersLock.unlockWrite(stamp);
+        }
+    }
+
     private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
         try {
             requireNonNull(listener).close();
index f954da4da3ebfe5da406eacce2956e77b2e27a59..f0fa51af769ebca0d61bda3a2aaf3ddedcb2068b 100644 (file)
@@ -7,26 +7,19 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
-import java.time.Instant;
-import java.util.Optional;
-import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects.ToStringHelper;
 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@link NotificationListenerAdapter} is responsible to track events on notifications.
  */
-public final class NotificationListenerAdapter extends AbstractCommonSubscriber<Absolute, DOMNotification>
-        implements DOMNotificationListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
-    private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY = JSONNotificationFormatter.createFactory(
-            JSONCodecFactorySupplier.RFC7951);
+public final class NotificationListenerAdapter extends AbstractNotificationListenerAdaptor {
+    private final Absolute path;
 
     /**
      * Set path of listener and stream name.
@@ -36,35 +29,13 @@ public final class NotificationListenerAdapter extends AbstractCommonSubscriber<
      * @param outputType Type of output on notification (JSON or XML).
      */
     NotificationListenerAdapter(final Absolute path, final String streamName, final NotificationOutputType outputType) {
-        super(path.lastNodeIdentifier(), streamName, path, outputType, getFormatterFactory(outputType));
-    }
-
-    private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
-        return switch (outputType) {
-            case JSON -> JSON_FORMATTER_FACTORY;
-            case XML -> XMLNotificationFormatter.FACTORY;
-        };
+        super(path.lastNodeIdentifier(), streamName, outputType);
+        this.path = requireNonNull(path);
     }
 
     @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public void onNotification(final DOMNotification notification) {
-        final Instant now = Instant.now();
-        if (!checkStartStop(now)) {
-            return;
-        }
-
-        final Optional<String> maybeOutput;
-        try {
-            maybeOutput = formatter().eventData(databindProvider.currentContext().modelContext(), notification, now,
-                getLeafNodesOnly(), isSkipNotificationData(), getChangedLeafNodesOnly());
-        } catch (Exception e) {
-            LOG.error("Failed to process notification {}", notification, e);
-            return;
-        }
-        if (maybeOutput.isPresent()) {
-            post(maybeOutput.get());
-        }
+    EffectiveModelContext effectiveModel() {
+        return databindProvider.currentContext().modelContext();
     }
 
     /**
@@ -73,7 +44,7 @@ public final class NotificationListenerAdapter extends AbstractCommonSubscriber<
      * @return The configured schema path that points to observing YANG notification schema node.
      */
     public Absolute getSchemaPath() {
-        return path();
+        return path;
     }
 
     public synchronized void listen(final DOMNotificationService notificationService) {
@@ -81,4 +52,9 @@ public final class NotificationListenerAdapter extends AbstractCommonSubscriber<
             setRegistration(notificationService.registerNotificationListener(this, getSchemaPath()));
         }
     }
+
+    @Override
+    ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+        return super.addToStringAttributes(helper.add("path", path));
+    }
 }
index 1bc05d629ea756be516442c54de54041728e649d..97e0ceecf9409bdf5f674e3a877dc445adfade3c 100644 (file)
@@ -36,6 +36,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.mdsal.dom.api.DOMRpcException;
 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationNotAvailableException;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
@@ -45,6 +46,7 @@ import org.opendaylight.restconf.common.context.InstanceIdentifierContext;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.TestRestconfUtils;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
+import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -75,6 +77,8 @@ public class RestconfInvokeOperationsServiceImplTest {
     private DOMRpcService rpcService;
     @Mock
     private DOMMountPoint mountPoint;
+    @Mock
+    private DOMMountPointService mountPointService;
     private RestconfInvokeOperationsServiceImpl invokeOperationsService;
 
     @BeforeClass
@@ -84,7 +88,8 @@ public class RestconfInvokeOperationsServiceImplTest {
 
     @Before
     public void setup() {
-        invokeOperationsService = new RestconfInvokeOperationsServiceImpl(rpcService);
+        invokeOperationsService = new RestconfInvokeOperationsServiceImpl(rpcService, mountPointService,
+            mock(Configuration.class));
     }
 
     @Test