Bump upstream dependencies to K-SR2
[transportpce.git] / networkmodel / src / main / java / org / opendaylight / transportpce / networkmodel / NetConfTopologyListener.java
index d11d28695693394690a5786a564b2b916c830bbf..ce15d3377f810b3688b34b49e2cd8eb12b57f8ee 100644 (file)
  */
 package org.opendaylight.transportpce.networkmodel;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-import javax.annotation.Nonnull;
-
+import java.util.concurrent.ExecutionException;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.DataObjectModification;
-import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
 import org.opendaylight.mdsal.binding.api.DataTreeModification;
 import org.opendaylight.mdsal.binding.api.MountPoint;
 import org.opendaylight.mdsal.binding.api.NotificationService;
 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.transportpce.common.StringConstants;
+import org.opendaylight.transportpce.common.Timeouts;
 import org.opendaylight.transportpce.common.device.DeviceTransactionManager;
+import org.opendaylight.transportpce.common.mapping.PortMapping;
 import org.opendaylight.transportpce.networkmodel.dto.NodeRegistration;
-import org.opendaylight.transportpce.networkmodel.dto.NodeRegistration22;
-import org.opendaylight.transportpce.networkmodel.listeners.AlarmNotificationListener;
-import org.opendaylight.transportpce.networkmodel.listeners.AlarmNotificationListener221;
-import org.opendaylight.transportpce.networkmodel.listeners.DeOperationsListener;
-import org.opendaylight.transportpce.networkmodel.listeners.DeOperationsListener221;
-import org.opendaylight.transportpce.networkmodel.listeners.DeviceListener;
-import org.opendaylight.transportpce.networkmodel.listeners.DeviceListener221;
-import org.opendaylight.transportpce.networkmodel.listeners.TcaListener;
-import org.opendaylight.transportpce.networkmodel.listeners.TcaListener221;
 import org.opendaylight.transportpce.networkmodel.service.NetworkModelService;
-import org.opendaylight.yang.gen.v1.http.org.openroadm.alarm.rev161014.OrgOpenroadmAlarmListener;
-import org.opendaylight.yang.gen.v1.http.org.openroadm.de.operations.rev161014.OrgOpenroadmDeOperationsListener;
-import org.opendaylight.yang.gen.v1.http.org.openroadm.device.rev170206.OrgOpenroadmDeviceListener;
-import org.opendaylight.yang.gen.v1.http.org.openroadm.tca.rev161014.OrgOpenroadmTcaListener;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240118.ConnectionOper.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240118.connection.oper.available.capabilities.AvailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class NetConfTopologyListener implements DataTreeChangeListener<Node> {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetConfTopologyListener.class);
-
+    private static final String RPC_SERVICE_FAILED = "Failed to get RpcService for node {}";
     private final NetworkModelService networkModelService;
     private final DataBroker dataBroker;
     private final DeviceTransactionManager deviceTransactionManager;
     private final Map<String, NodeRegistration> registrations;
-    private final Map<String, NodeRegistration22> registrations22;
+    private final PortMapping portMapping;
 
-    public NetConfTopologyListener(final NetworkModelService networkModelService, final DataBroker dataBroker,
-             DeviceTransactionManager deviceTransactionManager) {
+    public NetConfTopologyListener(
+            final NetworkModelService networkModelService,
+            final DataBroker dataBroker,
+            DeviceTransactionManager deviceTransactionManager,
+            PortMapping portMapping) {
         this.networkModelService = networkModelService;
         this.dataBroker = dataBroker;
         this.deviceTransactionManager = deviceTransactionManager;
         this.registrations = new ConcurrentHashMap<>();
-        this.registrations22 = new ConcurrentHashMap<>();
+        this.portMapping = portMapping;
+    }
+
+    @Override
+    public void onDataTreeChanged(Collection<DataTreeModification<Node>> changes) {
+        LOG.info("onDataTreeChanged - {}", this.getClass().getSimpleName());
+        for (DataTreeModification<Node> change : changes) {
+            DataObjectModification<Node> rootNode = change.getRootNode();
+            if (rootNode.getDataBefore() == null) {
+                continue;
+            }
+            String nodeId = rootNode.getDataBefore().key().getNodeId().getValue();
+            NetconfNode netconfNodeBefore = rootNode.getDataBefore().augmentation(NetconfNode.class);
+            switch (rootNode.getModificationType()) {
+                case DELETE:
+                    if (this.networkModelService.deleteOpenRoadmnode(nodeId)) {
+                        onDeviceDisConnected(nodeId);
+                        LOG.info("Device {} correctly disconnected from controller", nodeId);
+                    }
+                    break;
+                case WRITE:
+                    NetconfNode netconfNodeAfter = rootNode.getDataAfter().augmentation(NetconfNode.class);
+                    if (ConnectionStatus.Connecting.equals(netconfNodeBefore.getConnectionStatus())
+                            && ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) {
+                        LOG.info("Connecting Node: {}", nodeId);
+                        Optional<AvailableCapability> deviceCapabilityOpt =
+                            netconfNodeAfter.getAvailableCapabilities().getAvailableCapability().stream()
+                                .filter(cp -> cp.getCapability().contains(StringConstants.OPENROADM_DEVICE_MODEL_NAME))
+                                .sorted((c1, c2) -> c2.getCapability().compareTo(c1.getCapability()))
+                                .findFirst();
+                        if (deviceCapabilityOpt.isEmpty()) {
+                            LOG.error("Unable to get openroadm-device-capability");
+                            return;
+                        }
+                        this.networkModelService
+                            .createOpenRoadmNode(nodeId, deviceCapabilityOpt.orElseThrow().getCapability());
+                        onDeviceConnected(nodeId, deviceCapabilityOpt.orElseThrow().getCapability());
+                        LOG.info("Device {} correctly connected to controller", nodeId);
+                    }
+                    if (ConnectionStatus.Connected.equals(netconfNodeBefore.getConnectionStatus())
+                            && ConnectionStatus.Connecting.equals(netconfNodeAfter.getConnectionStatus())) {
+                        LOG.warn("Node: {} is being disconnected", nodeId);
+                    }
+                    break;
+                default:
+                    LOG.debug("Unknown modification type {}", rootNode.getModificationType().name());
+                    break;
+            }
+        }
     }
 
     private void onDeviceConnected(final String nodeId, String openRoadmVersion) {
         LOG.info("onDeviceConnected: {}", nodeId);
-        LOG.info(StringConstants.OPENROADM_DEVICE_VERSION_1_2_1);
-        LOG.info(openRoadmVersion);
         Optional<MountPoint> mountPointOpt = this.deviceTransactionManager.getDeviceMountPoint(nodeId);
-        MountPoint mountPoint;
-        if (mountPointOpt.isPresent()) {
-            mountPoint = mountPointOpt.get();
-        } else {
+        if (mountPointOpt.isEmpty()) {
             LOG.error("Failed to get mount point for node {}", nodeId);
             return;
         }
-
-        final Optional<NotificationService> notificationService =
-                mountPoint.getService(NotificationService.class);
-        if (!notificationService.isPresent()) {
-            LOG.error("Failed to get RpcService for node {}", nodeId);
+        MountPoint mountPoint = mountPointOpt.orElseThrow();
+        final Optional<NotificationService> notificationService = mountPoint.getService(NotificationService.class);
+        if (notificationService.isEmpty()) {
+            LOG.error(RPC_SERVICE_FAILED, nodeId);
             return;
         }
+        NodeRegistration nodeRegistration =
+            new NodeRegistration(
+                nodeId, openRoadmVersion, notificationService.orElseThrow(), this.dataBroker, this.portMapping);
+        nodeRegistration.registerListeners();
+        registrations.put(nodeId, nodeRegistration);
 
-        if (openRoadmVersion.equals(StringConstants.OPENROADM_DEVICE_VERSION_1_2_1)) {
-
-            final OrgOpenroadmAlarmListener alarmListener = new AlarmNotificationListener(this.dataBroker);
-            LOG.info("Registering notification listener on OrgOpenroadmAlarmListener for node: {}", nodeId);
-            final ListenerRegistration<OrgOpenroadmAlarmListener> accessAlarmNotificationListenerRegistration =
-                notificationService.get().registerNotificationListener(alarmListener);
-
-            final OrgOpenroadmDeOperationsListener deOperationsListener = new DeOperationsListener();
-            LOG.info("Registering notification listener on OrgOpenroadmDeOperationsListener for node: {}", nodeId);
-            final ListenerRegistration<OrgOpenroadmDeOperationsListener>
-                accessDeOperationasNotificationListenerRegistration =
-                notificationService.get().registerNotificationListener(deOperationsListener);
-
-            final OrgOpenroadmDeviceListener deviceListener = new DeviceListener();
-            LOG.info("Registering notification listener on OrgOpenroadmDeviceListener for node: {}", nodeId);
-            final ListenerRegistration<OrgOpenroadmDeviceListener> accessDeviceNotificationListenerRegistration =
-                notificationService.get().registerNotificationListener(deviceListener);
-
-            TcaListener tcaListener = new TcaListener();
-            LOG.info("Registering notification listener on OrgOpenroadmTcaListener for node: {}", nodeId);
-            final ListenerRegistration<OrgOpenroadmTcaListener> accessTcaNotificationListenerRegistration =
-                notificationService.get().registerNotificationListener(tcaListener);
-
-            String streamName = "NETCONF";
-
-            if (streamName == null) {
-                streamName = "OPENROADM";
-            }
-
-            final Optional<RpcConsumerRegistry> service = mountPoint.getService(RpcConsumerRegistry.class);
-            if (service.isPresent()) {
-                final NotificationsService rpcService = service.get().getRpcService(NotificationsService.class);
-                if (rpcService == null) {
-                    LOG.error("Failed to get RpcService for node {}", nodeId);
-                } else {
-                    final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
-                        new CreateSubscriptionInputBuilder();
-                    createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
-                    LOG.info("Triggering notification stream {} for node {}", streamName, nodeId);
-                    rpcService.createSubscription(createSubscriptionInputBuilder.build());
-                }
-            } else {
-                LOG.error("Failed to get RpcService for node {}", nodeId);
-            }
-            NodeRegistration nodeRegistration = new NodeRegistration(nodeId,
-                accessAlarmNotificationListenerRegistration,
-                accessDeOperationasNotificationListenerRegistration, accessDeviceNotificationListenerRegistration,
-                null, accessTcaNotificationListenerRegistration);
-            registrations.put(nodeId, nodeRegistration);
-
-        } else if (openRoadmVersion.equals(StringConstants.OPENROADM_DEVICE_VERSION_2_2_1)) {
-            final org.opendaylight.yang.gen.v1.http.org.openroadm.alarm.rev181019.OrgOpenroadmAlarmListener
-                alarmListener = new AlarmNotificationListener221(dataBroker);
-            LOG.info("Registering notification listener on OrgOpenroadmAlarmListener for node: {}", nodeId);
-            final ListenerRegistration<org.opendaylight.yang.gen.v1.http.org.openroadm.alarm.rev181019
-                .OrgOpenroadmAlarmListener> accessAlarmNotificationListenerRegistration =
-                notificationService.get().registerNotificationListener(alarmListener);
-
-            final org.opendaylight.yang.gen.v1.http.org.openroadm.de.operations.rev181019
-                .OrgOpenroadmDeOperationsListener deOperationsListener = new DeOperationsListener221();
-            LOG.info("Registering notification listener on OrgOpenroadmDeOperationsListener for node: {}", nodeId);
-            final ListenerRegistration<org.opendaylight.yang.gen.v1.http.org.openroadm.de.operations.rev181019
-                .OrgOpenroadmDeOperationsListener> accessDeOperationasNotificationListenerRegistration =
-                notificationService.get().registerNotificationListener(deOperationsListener);
-
-            final org.opendaylight.yang.gen.v1.http.org.openroadm.device.rev181019.OrgOpenroadmDeviceListener
-                deviceListener = new DeviceListener221();
-            LOG.info("Registering notification listener on OrgOpenroadmDeviceListener for node: {}", nodeId);
-            final ListenerRegistration<org.opendaylight.yang.gen.v1.http.org.openroadm.device.rev181019
-                .OrgOpenroadmDeviceListener> accessDeviceNotificationListenerRegistration =
-                notificationService.get().registerNotificationListener(deviceListener);
-
-            final org.opendaylight.yang.gen.v1.http.org.openroadm.tca.rev181019.OrgOpenroadmTcaListener
-                tcaListener = new TcaListener221();
-            LOG.info("Registering notification listener on OrgOpenroadmTcaListener for node: {}", nodeId);
-            final ListenerRegistration<org.opendaylight.yang.gen.v1.http.org.openroadm.tca.rev181019
-                .OrgOpenroadmTcaListener> accessTcaNotificationListenerRegistration =
-                notificationService.get().registerNotificationListener(tcaListener);
-
-
-            String streamName = "NETCONF";
-            if (streamName == null) {
-                streamName = "OPENROADM";
-            }
-            final Optional<RpcConsumerRegistry> service = mountPoint.getService(RpcConsumerRegistry.class);
-            if (service.isPresent()) {
-                final NotificationsService rpcService = service.get().getRpcService(NotificationsService.class);
-                if (rpcService == null) {
-                    LOG.error("Failed to get RpcService for node {}", nodeId);
-                } else {
-                    final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
-                        new CreateSubscriptionInputBuilder();
-                    createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
-                    LOG.info("Triggering notification stream {} for node {}", streamName, nodeId);
-                    rpcService.createSubscription(createSubscriptionInputBuilder.build());
-                }
-            } else {
-                LOG.error("Failed to get RpcService for node {}", nodeId);
-            }
-            NodeRegistration22 nodeRegistration22 = new NodeRegistration22(nodeId,
-                accessAlarmNotificationListenerRegistration,
-                accessDeOperationasNotificationListenerRegistration, accessDeviceNotificationListenerRegistration,
-                null, accessTcaNotificationListenerRegistration);
-            registrations22.put(nodeId, nodeRegistration22);
-
-        }
-
+        subscribeStream(mountPoint, nodeId);
     }
 
     private void onDeviceDisConnected(final String nodeId) {
         LOG.info("onDeviceDisConnected: {}", nodeId);
-        NodeRegistration nodeRegistration = this.registrations.remove(nodeId);
-        if (nodeRegistration != null) {
-            nodeRegistration.getAccessAlarmNotificationListenerRegistration().close();
-            nodeRegistration.getAccessDeOperationasNotificationListenerRegistration().close();
-            nodeRegistration.getAccessDeviceNotificationListenerRegistration().close();
-            nodeRegistration.getAccessTcaNotificationListenerRegistration().close();
-        }
+        this.registrations.remove(nodeId).unregisterListeners();
     }
 
-    @Override
-    @SuppressWarnings("checkstyle:FallThrough")
-    public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> changes) {
-        LOG.info("onDataTreeChanged");
-        for (DataTreeModification<Node> change : changes) {
-            DataObjectModification<Node> rootNode = change.getRootNode();
-            if ((rootNode.getDataAfter() == null) && (rootNode.getModificationType() != ModificationType.DELETE)) {
-                LOG.error("rootNode.getDataAfter is null : Node not connected via Netconf protocol");
-                continue;
-            }
-            if (rootNode.getModificationType() == ModificationType.DELETE) {
-                if (rootNode.getDataBefore() != null) {
-                    String nodeId = rootNode.getDataBefore().key().getNodeId().getValue();
-                    LOG.info("Node {} deleted", nodeId);
-                    this.networkModelService.deleteOpenRoadmnode(nodeId);
-                    onDeviceDisConnected(nodeId);
-                } else {
-                    LOG.error("rootNode.getDataBefore is null !");
-                }
-                continue;
-            }
-            String nodeId = rootNode.getDataAfter().key().getNodeId().getValue();
-            NetconfNode netconfNode = rootNode.getDataAfter().augmentation(NetconfNode.class);
-
-            if ((netconfNode != null) && !StringConstants.DEFAULT_NETCONF_NODEID.equals(nodeId)) {
-                switch (rootNode.getModificationType()) {
-                    case WRITE:
-                        LOG.info("Node added: {}", nodeId);
-                    case SUBTREE_MODIFIED:
-                        NetconfNodeConnectionStatus.ConnectionStatus connectionStatus =
-                                netconfNode.getConnectionStatus();
-                        try {
-                            List<AvailableCapability> deviceCapabilities = netconfNode.getAvailableCapabilities()
-                                .getAvailableCapability().stream().filter(cp -> cp.getCapability()
-                                .contains(StringConstants.OPENROADM_DEVICE_MODEL_NAME)).collect(Collectors.toList());
-                            if (!deviceCapabilities.isEmpty()) {
-                                Collections.sort(deviceCapabilities, (cp0, cp1) -> cp1.getCapability()
-                                    .compareTo(cp0.getCapability()));
-                                LOG.info("OpenROADM node detected: {} {}", nodeId, connectionStatus.name());
-                                switch (connectionStatus) {
-                                    case Connected:
-                                        this.networkModelService.createOpenRoadmNode(nodeId, deviceCapabilities.get(0)
-                                            .getCapability());
-                                        onDeviceConnected(nodeId,deviceCapabilities.get(0).getCapability());
-                                        break;
-                                    case Connecting:
-                                    case UnableToConnect:
-                                        this.networkModelService.setOpenRoadmNodeStatus(nodeId, connectionStatus);
-                                        onDeviceDisConnected(nodeId);
-                                        break;
-                                    default:
-                                        LOG.warn("Unsupported device state {}", connectionStatus.getName());
-                                        break;
-                                }
-                            }
-
-                        } catch (NullPointerException e) {
-                            LOG.error("Cannot get available Capabilities");
-                        }
-                        break;
-                    default:
-                        LOG.warn("Unexpected connection status : {}", rootNode.getModificationType());
-                        break;
-                }
+    private boolean subscribeStream(MountPoint mountPoint, String nodeId) {
+        final Optional<RpcConsumerRegistry> service = mountPoint.getService(RpcConsumerRegistry.class);
+        if (service.isEmpty()) {
+            return false;
+        }
+        final NotificationsService rpcService = service.orElseThrow().getRpcService(NotificationsService.class);
+        if (rpcService == null) {
+            LOG.error(RPC_SERVICE_FAILED, nodeId);
+            return false;
+        }
+        // Set the default stream as OPENROADM
+        for (String streamName : getSupportedStream(nodeId)) {
+            LOG.info("Triggering notification stream {} for node {}", streamName, nodeId);
+            ListenableFuture<RpcResult<CreateSubscriptionOutput>> subscription =
+                rpcService.createSubscription(
+                    new CreateSubscriptionInputBuilder().setStream(new StreamNameType(streamName)).build());
+            if (checkSupportedStream(streamName, subscription)) {
+                return true;
             }
         }
+        return false;
     }
 
+    @VisibleForTesting
+    public NetConfTopologyListener(
+            final NetworkModelService networkModelService,
+            final DataBroker dataBroker,
+            DeviceTransactionManager deviceTransactionManager,
+            PortMapping portMapping,
+            Map<String, NodeRegistration> registrations) {
+        this.networkModelService = networkModelService;
+        this.dataBroker = dataBroker;
+        this.deviceTransactionManager = deviceTransactionManager;
+        this.portMapping = portMapping;
+        this.registrations = registrations;
+    }
 
-    /*private String getSupportedStream(String nodeId) {
-        InstanceIdentifier<Streams> streamsIID = InstanceIdentifier.create(Netconf.class).child(Streams.class);
+    private boolean checkSupportedStream(
+            String streamName,
+            ListenableFuture<RpcResult<CreateSubscriptionOutput>> subscription) {
+        boolean subscriptionSuccessful = false;
         try {
-            Optional<Streams> ordmInfoObject =
-                    this.deviceTransactionManager.getDataFromDevice(nodeId, LogicalDatastoreType.OPERATIONAL,
-                            streamsIID, Timeouts.DEVICE_READ_TIMEOUT, Timeouts.DEVICE_READ_TIMEOUT_UNIT);
-            if (!ordmInfoObject.isPresent()) {
-                LOG.error("Get Stream RPC is not supported");
-                return "NETCONF";
-            }
-            for (org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf
-                        .streams.Stream strm : ordmInfoObject.get().getStream()) {
+            // Using if condition does not work, since we need to handle exceptions
+            subscriptionSuccessful = subscription.get().isSuccessful();
+            LOG.info("{} subscription is {}", streamName, subscriptionSuccessful);
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error during subscription to stream {}", streamName, e);
+        }
+        return subscriptionSuccessful;
+    }
 
-                if ("OPENROADM".equalsIgnoreCase(strm.getName().getValue())) {
-                    return strm.getName().getValue().toUpperCase();
-                }
+    private List<String> getSupportedStream(String nodeId) {
+        InstanceIdentifier<Streams> streamsIID = InstanceIdentifier.create(Netconf.class).child(Streams.class);
+        Optional<Streams> ordmInfoObject =
+                deviceTransactionManager.getDataFromDevice(nodeId, LogicalDatastoreType.OPERATIONAL, streamsIID,
+                        Timeouts.DEVICE_READ_TIMEOUT, Timeouts.DEVICE_READ_TIMEOUT_UNIT);
+        if (ordmInfoObject == null || ordmInfoObject.isEmpty() || ordmInfoObject.orElseThrow().getStream().isEmpty()) {
+            LOG.error("List of streams supports by device is not present");
+            return List.of("OPENROADM","NETCONF");
+        }
+        List<String> streams = new ArrayList<>();
+        List<String> netconfStreams = new ArrayList<>();
+        for (Stream strm : ordmInfoObject.orElseThrow().getStream().values()) {
+            LOG.debug("Streams are {}", strm);
+            if ("OPENROADM".equalsIgnoreCase(strm.getName().getValue())) {
+                streams.add(strm.getName().getValue());
+            } else if ("NETCONF".equalsIgnoreCase(strm.getName().getValue())) {
+                netconfStreams.add(strm.getName().getValue());
             }
-            return "NETCONF";
-        } catch (NullPointerException ex) {
-            LOG.error("NullPointerException thrown while getting Info from a non Open ROADM device {}", nodeId);
-            return "NETCONF";
         }
-    }*/
+        // If OpenROADM streams are not supported, try NETCONF streams subscription
+        streams.addAll(netconfStreams);
+        return
+            streams.isEmpty()
+                ? List.of("OPENROADM","NETCONF")
+                : streams;
+    }
+
 }