- @Override
- @SuppressWarnings("checkstyle:FallThrough")
- public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> changes) {
- LOG.info("onDataTreeChanged");
- for (DataTreeModification<Node> change : changes) {
- DataObjectModification<Node> rootNode = change.getRootNode();
- if ((rootNode.getDataAfter() == null) && (rootNode.getModificationType() != ModificationType.DELETE)) {
- LOG.error("rootNode.getDataAfter is null : Node not connected via Netconf protocol");
- continue;
- }
- if (rootNode.getModificationType() == ModificationType.DELETE) {
- if (rootNode.getDataBefore() != null) {
- String nodeId = rootNode.getDataBefore().key().getNodeId().getValue();
- LOG.info("Node {} deleted", nodeId);
- this.networkModelService.deleteOpenROADMnode(nodeId);
- onDeviceDisConnected(nodeId);
- } else {
- LOG.error("rootNode.getDataBefore is null !");
- }
- continue;
- }
- String nodeId = rootNode.getDataAfter().key().getNodeId().getValue();
- NetconfNode netconfNode = rootNode.getDataAfter().augmentation(NetconfNode.class);
-
- if ((netconfNode != null) && !StringConstants.DEFAULT_NETCONF_NODEID.equals(nodeId)) {
- switch (rootNode.getModificationType()) {
- case WRITE:
- LOG.info("Node added: {}", nodeId);
- case SUBTREE_MODIFIED:
- NetconfNodeConnectionStatus.ConnectionStatus connectionStatus =
- netconfNode.getConnectionStatus();
- try {
- long count = netconfNode.getAvailableCapabilities().getAvailableCapability().stream()
- .filter(cp -> cp.getCapability().contains(StringConstants.OPENROADM_DEVICE_MODEL_NAME))
- .count();
- if (count > 0) {
- LOG.info("OpenROADM node detected: {} {}", nodeId, connectionStatus.name());
- switch (connectionStatus) {
- case Connected:
- this.networkModelService.createOpenROADMnode(nodeId);
- onDeviceConnected(nodeId);
- break;
- case Connecting:
- case UnableToConnect:
- this.networkModelService.setOpenROADMnodeStatus(nodeId, connectionStatus);
- onDeviceDisConnected(nodeId);
- break;
- default:
- LOG.warn("Unsupported device state {}", connectionStatus.getName());
- break;
- }
- }
- } catch (NullPointerException e) {
- LOG.error("Cannot get available Capabilities");
- }
- break;
- default:
- LOG.warn("Unexpected connection status : {}", rootNode.getModificationType());
- break;
- }
+ private boolean subscribeStream(MountPoint mountPoint, String nodeId) {
+ final Optional<RpcConsumerRegistry> service = mountPoint.getService(RpcConsumerRegistry.class);
+ if (service.isEmpty()) {
+ return false;
+ }
+ final NotificationsService rpcService = service.orElseThrow().getRpcService(NotificationsService.class);
+ if (rpcService == null) {
+ LOG.error(RPC_SERVICE_FAILED, nodeId);
+ return false;
+ }
+ // Set the default stream as OPENROADM
+ for (String streamName : getSupportedStream(nodeId)) {
+ LOG.info("Triggering notification stream {} for node {}", streamName, nodeId);
+ ListenableFuture<RpcResult<CreateSubscriptionOutput>> subscription =
+ rpcService.createSubscription(
+ new CreateSubscriptionInputBuilder().setStream(new StreamNameType(streamName)).build());
+ if (checkSupportedStream(streamName, subscription)) {
+ return true;