From 5b61d96f9251589244940215ee9f62e7ff204d11 Mon Sep 17 00:00:00 2001 From: Javier Errea Date: Fri, 22 Oct 2021 15:40:49 +0200 Subject: [PATCH] Implementation of T-API notification rpcs - Tapi notification rpcs implementation - tapi notification de-serializer - Add ENABLE_AUTO_COMMIT_CONFIG to kafka subscriber configuration & and TopicPartition 0 to ConsumerRecords to enable to read TapiNotifications JIRA: TRNSPRTPCE-560 Signed-off-by: errea Change-Id: Ic49522b43c6351835eec67001e42887392a71238 --- .../nbinotifications/consumer/Subscriber.java | 4 +- .../impl/NbiNotificationsImpl.java | 346 ++++++++++++++---- .../impl/NbiNotificationsProvider.java | 2 +- .../TapiNotificationDeserializer.java | 132 +++++++ .../nbinotifications/utils/TopicManager.java | 12 + .../impl/NbiNotificationsImplTest.java | 8 +- 6 files changed, 424 insertions(+), 80 deletions(-) create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/TapiNotificationDeserializer.java diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java index 7ebc0bdf6..154cbb80f 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.transportpce.common.converter.JsonStringConverter; @@ -39,6 +40,7 @@ public class Subscriber { Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties"); propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id); + propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf); propsConsumer.put(ConfigConstants.CONVERTER , deserializer); @@ -55,7 +57,7 @@ public class Subscriber { final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); List notificationServiceList = new ArrayList<>(); YangInstanceIdentifier.of(name); - for (ConsumerRecord record : consumerRecords) { + for (ConsumerRecord record : consumerRecords.records(new TopicPartition(topicName, 0))) { if (record.value() != null) { notificationServiceList.add(record.value()); } diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java index ea5715f7d..0a23c16e0 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java @@ -8,10 +8,13 @@ package org.opendaylight.transportpce.nbinotifications.impl; import com.google.common.util.concurrent.ListenableFuture; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -22,6 +25,7 @@ import org.opendaylight.transportpce.common.network.NetworkTransactionService; import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber; import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer; import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer; +import org.opendaylight.transportpce.nbinotifications.serialization.TapiNotificationDeserializer; import org.opendaylight.transportpce.nbinotifications.utils.TopicManager; import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsAlarmServiceInput; import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsAlarmServiceOutput; @@ -32,6 +36,7 @@ import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotifications import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NbiNotificationsService; import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationAlarmService; import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationProcessService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService; import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.get.notifications.alarm.service.output.NotificationsAlarmService; import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.get.notifications.process.service.output.NotificationsProcessService; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.Context; @@ -42,14 +47,19 @@ import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev18121 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.CreateNotificationSubscriptionServiceOutputBuilder; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.DeleteNotificationSubscriptionServiceInput; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.DeleteNotificationSubscriptionServiceOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.DeleteNotificationSubscriptionServiceOutputBuilder; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationListInput; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationListOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationListOutputBuilder; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceDetailsInput; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceDetailsOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceDetailsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceListInput; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceListOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceListOutputBuilder; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetSupportedNotificationTypesInput; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetSupportedNotificationTypesOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetSupportedNotificationTypesOutputBuilder; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.NotificationType; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.ObjectType; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.TapiNotificationService; @@ -59,6 +69,9 @@ import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev18121 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.context.NotificationContextBuilder; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.create.notification.subscription.service.output.SubscriptionService; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.create.notification.subscription.service.output.SubscriptionServiceBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.Notification; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.NotificationKey; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.subscription.service.list.output.SubscriptionServiceKey; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscription; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscriptionBuilder; import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscriptionKey; @@ -75,15 +88,18 @@ public class NbiNotificationsImpl implements NbiNotificationsService, TapiNotifi private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class); private final JsonStringConverter converterService; private final JsonStringConverter converterAlarmService; + private final JsonStringConverter converterTapiService; private final String server; private final NetworkTransactionService networkTransactionService; private final TopicManager topicManager; public NbiNotificationsImpl(JsonStringConverter converterService, - JsonStringConverter converterAlarmService, String server, + JsonStringConverter converterAlarmService, + JsonStringConverter converterTapiService, String server, NetworkTransactionService networkTransactionService, TopicManager topicManager) { this.converterService = converterService; this.converterAlarmService = converterAlarmService; + this.converterTapiService = converterTapiService; this.server = server; this.networkTransactionService = networkTransactionService; this.topicManager = topicManager; @@ -125,111 +141,289 @@ public class NbiNotificationsImpl implements NbiNotificationsService, TapiNotifi @Override public ListenableFuture> - getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) { - return null; + getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) { + NotificationContext notificationContext = getNotificationContext(); + if (notificationContext == null) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Couldnt get Notification Context from Datastore") + .buildFuture(); + } + if (notificationContext.getNotifSubscription() == null) { + return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder() + .setSupportedNotificationTypes(new HashSet<>()) + .setSupportedObjectTypes(new HashSet<>()).build()).buildFuture(); + } + Set notificationTypeList = new HashSet<>(); + Set objectTypeList = new HashSet<>(); + for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) { + if (notifSubscription.getSupportedNotificationTypes() != null) { + notificationTypeList.addAll(notifSubscription.getSupportedNotificationTypes()); + } + if (notifSubscription.getSupportedObjectTypes() != null) { + objectTypeList.addAll(notifSubscription.getSupportedObjectTypes()); + } + } + return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder() + .setSupportedNotificationTypes(notificationTypeList) + .setSupportedObjectTypes(objectTypeList).build()).buildFuture(); } @Override public ListenableFuture> createNotificationSubscriptionService(CreateNotificationSubscriptionServiceInput input) { - try { - for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) { - LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue()); - this.topicManager.addTapiTopic(uuid.getValue()); - } + for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) { + LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue()); + this.topicManager.addTapiTopic(uuid.getValue()); + } + SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder() + .setName(input.getSubscriptionFilter().getName()) + .setLocalId(input.getSubscriptionFilter().getLocalId()) + .setIncludeContent(input.getSubscriptionFilter().getIncludeContent()) + .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes()) + .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols()) + .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier()) + .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes()) + .build(); + Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString()); + SubscriptionService subscriptionService = new SubscriptionServiceBuilder() + .setSubscriptionFilter(subscriptionFilter) + .setSubscriptionState(input.getSubscriptionState()) + .setUuid(notifSubscriptionUuid) + .build(); - InstanceIdentifier notificationcontextIID = - InstanceIdentifier.builder(Context.class).augmentation(Context1.class) - .child(org.opendaylight.yang.gen.v1.urn - .onf.otcc.yang.tapi.notification.rev181210.context.NotificationContext.class) - .build(); - Optional notificationContextOptional - = this.networkTransactionService.read(LogicalDatastoreType.OPERATIONAL, notificationcontextIID).get(); - if (notificationContextOptional.isEmpty()) { - LOG.error("Could not create TAPI notification subscription service"); - return RpcResultBuilder.failed() - .withError(ErrorType.RPC, "Could not read notification context") - .buildFuture(); - } - SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder() - .setName(input.getSubscriptionFilter().getName()) - .setLocalId(input.getSubscriptionFilter().getLocalId()) - .setIncludeContent(input.getSubscriptionFilter().getIncludeContent()) - .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes()) - .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols()) - .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier()) - .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes()) - .build(); - SubscriptionService subscriptionService = new SubscriptionServiceBuilder() - .setSubscriptionFilter(subscriptionFilter) - .setSubscriptionState(input.getSubscriptionState()).build(); - Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString()); - NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid); - Set notificationTypes = (subscriptionFilter.getRequestedNotificationTypes() != null) - ? subscriptionFilter.getRequestedNotificationTypes() - : new HashSet<>(List.of(NotificationType.ALARMEVENT)); - Set objectTypes = (subscriptionFilter.getRequestedObjectTypes() != null) - ? subscriptionFilter.getRequestedObjectTypes() - : new HashSet<>(List.of(ObjectType.CONNECTIVITYSERVICE)); - NotifSubscription notifSubscription = new NotifSubscriptionBuilder() - .setSubscriptionState(subscriptionService.getSubscriptionState()) - .setSubscriptionFilter(subscriptionService.getSubscriptionFilter()) - .setUuid(notifSubscriptionUuid) - .setSupportedNotificationTypes(notificationTypes) - .setSupportedObjectTypes(objectTypes) - .setName(subscriptionService.getName()) - .build(); - NotificationContext notificationContext = notificationContextOptional.get(); - Map notifSubscriptions = new HashMap<>(); - if (notificationContext.getNotifSubscription() != null) { - notifSubscriptions.putAll(notificationContext.getNotifSubscription()); - } - notifSubscriptions.put(notifSubscriptionKey, notifSubscription); - NotificationContext notificationContext1 = new NotificationContextBuilder() - .setNotification(notificationContext.getNotification()) - .setNotifSubscription(notifSubscriptions) - .build(); - this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, notificationcontextIID, - notificationContext1); - this.networkTransactionService.commit().get(); - CreateNotificationSubscriptionServiceOutput serviceOutput = - new CreateNotificationSubscriptionServiceOutputBuilder() - .setSubscriptionService(subscriptionService) - .build(); - return RpcResultBuilder.success(serviceOutput).buildFuture(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Could not create TAPI notification subscription service"); + NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid); + Set notificationTypes = (subscriptionFilter.getRequestedNotificationTypes() != null) + ? subscriptionFilter.getRequestedNotificationTypes() + : new HashSet<>(List.of(NotificationType.ALARMEVENT)); + Set objectTypes = (subscriptionFilter.getRequestedObjectTypes() != null) + ? subscriptionFilter.getRequestedObjectTypes() + : new HashSet<>(List.of(ObjectType.CONNECTIVITYSERVICE)); + NotifSubscription notifSubscription = new NotifSubscriptionBuilder() + .setSubscriptionState(subscriptionService.getSubscriptionState()) + .setSubscriptionFilter(subscriptionService.getSubscriptionFilter()) + .setUuid(notifSubscriptionUuid) + .setSupportedNotificationTypes(notificationTypes) + .setSupportedObjectTypes(objectTypes) + .setName(subscriptionService.getName()) + .build(); + NotificationContext notificationContext = getNotificationContext(); + Map notifSubscriptions = new HashMap<>(); + if (notificationContext != null && notificationContext.getNotifSubscription() != null) { + notifSubscriptions.putAll(notificationContext.getNotifSubscription()); + } + notifSubscriptions.put(notifSubscriptionKey, notifSubscription); + NotificationContext notificationContext1 = new NotificationContextBuilder() + .setNotification(notificationContext == null ? new HashMap<>() : notificationContext.getNotification()) + .setNotifSubscription(notifSubscriptions) + .build(); + if (!updateNotificationContext(notificationContext1)) { + LOG.error("Failed to update Notification context"); return RpcResultBuilder.failed() - .withError(ErrorType.RPC, "Could not read notification context").buildFuture(); + .withError(ErrorType.RPC, "Failed to update notification context").buildFuture(); } + CreateNotificationSubscriptionServiceOutput serviceOutput = + new CreateNotificationSubscriptionServiceOutputBuilder() + .setSubscriptionService(subscriptionService) + .build(); + return RpcResultBuilder.success(serviceOutput).buildFuture(); } @Override public ListenableFuture> - updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) { + updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) { + // TODO --> Not yet implemented return null; } @Override public ListenableFuture> - deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) { - return null; + deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) { + try { + if (input == null || input.getSubscriptionIdOrName() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.failed() + .withError(ErrorType.RPC, "Missing input parameters").buildFuture(); + } + Uuid notifSubsUuid = getUuidFromIput(input.getSubscriptionIdOrName()); + InstanceIdentifier notifSubscriptionIID = InstanceIdentifier.builder(Context.class) + .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class, + new NotifSubscriptionKey(notifSubsUuid)).build(); + Optional optionalNotifSub = this.networkTransactionService.read( + LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get(); + + if (!optionalNotifSub.isPresent()) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Notification subscription doesnt exist").buildFuture(); + } + NotifSubscription notifSubscription = optionalNotifSub.get(); + this.networkTransactionService.delete(LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID); + this.networkTransactionService.commit().get(); + for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) { + this.topicManager.deleteTapiTopic(objectUuid.getValue()); + } + return RpcResultBuilder.success(new DeleteNotificationSubscriptionServiceOutputBuilder().build()) + .buildFuture(); + } catch (InterruptedException | ExecutionException | NoSuchElementException e) { + LOG.error("Failed to delete Notification subscription service", e); + } + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Failed to delete notification subscription service").buildFuture(); } @Override public ListenableFuture> - getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) { - return null; + getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) { + if (input == null || input.getSubscriptionIdOrName() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.failed() + .withError(ErrorType.RPC, "Missing input parameters").buildFuture(); + } + Uuid notifSubsUuid = getUuidFromIput(input.getSubscriptionIdOrName()); + NotificationContext notificationContext = getNotificationContext(); + if (notificationContext == null) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Notification context is empty") + .buildFuture(); + } + if (notificationContext.getNotifSubscription() == null) { + return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder() + .setSubscriptionService(new org.opendaylight.yang.gen.v1 + .urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.subscription.service + .details.output.SubscriptionServiceBuilder().build()).build()).buildFuture(); + } + if (!notificationContext.getNotifSubscription().containsKey(new NotifSubscriptionKey(notifSubsUuid))) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Notification subscription service doesnt exist").buildFuture(); + } + return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder() + .setSubscriptionService(new org.opendaylight.yang.gen.v1.urn + .onf.otcc.yang.tapi.notification.rev181210.get.notification.subscription.service.details.output + .SubscriptionServiceBuilder(notificationContext.getNotifSubscription().get( + new NotifSubscriptionKey(notifSubsUuid))).build()).build()).buildFuture(); } @Override public ListenableFuture> getNotificationSubscriptionServiceList(GetNotificationSubscriptionServiceListInput input) { - return null; + NotificationContext notificationContext = getNotificationContext(); + if (notificationContext == null) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Notification context is empty") + .buildFuture(); + } + if (notificationContext.getNotifSubscription() == null) { + return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder() + .setSubscriptionService(new HashMap<>()).build()).buildFuture(); + } + Map + notifSubsMap = new HashMap<>(); + for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) { + org.opendaylight.yang.gen.v1.urn.onf.otcc.yang + .tapi.notification.rev181210.get.notification.subscription.service.list.output.SubscriptionService + subscriptionService = new org.opendaylight.yang.gen.v1 + .urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.subscription.service + .list.output.SubscriptionServiceBuilder(notifSubscription).build(); + notifSubsMap.put(subscriptionService.key(), subscriptionService); + } + return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder() + .setSubscriptionService(notifSubsMap).build()).buildFuture(); } @Override public ListenableFuture> getNotificationList(GetNotificationListInput input) { + try { + LOG.info("RPC getNotificationList received"); + if (input == null || input.getSubscriptionIdOrName() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.failed().withError(ErrorType.RPC, + "Missing input parameters").buildFuture(); + } + Uuid notifSubsUuid = getUuidFromIput(input.getSubscriptionIdOrName()); + InstanceIdentifier notifSubscriptionIID = InstanceIdentifier.builder(Context.class) + .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class, + new NotifSubscriptionKey(notifSubsUuid)).build(); + Optional optionalNotifSub = this.networkTransactionService.read( + LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get(); + + if (!optionalNotifSub.isPresent()) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Notification subscription doesnt exist").buildFuture(); + } + NotifSubscription notifSubscription = optionalNotifSub.get(); + List notificationTapiList = new ArrayList<>(); + for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) { + if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) { + LOG.warn("Topic doesnt exist for {}", objectUuid.getValue()); + continue; + } + LOG.info("Going to get notifications for topic {}", objectUuid.getValue()); + Subscriber subscriber = new Subscriber<>( + objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService, + TapiNotificationDeserializer.class); + notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME)); + } + LOG.info("TAPI notifications = {}", notificationTapiList); + Map notificationMap = new HashMap<>(); + for (Notification notif:notificationTapiList) { + notificationMap.put(notif.key(), notif); + } + return RpcResultBuilder.success(new GetNotificationListOutputBuilder() + .setNotification(notificationMap).build()).buildFuture(); + } catch (InterruptedException | ExecutionException | NoSuchElementException e) { + LOG.error("Failed to get Notifications from Kafka", e); + } + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Notifications couldnt be retrieved from Kafka server").buildFuture(); + } + + private NotificationContext getNotificationContext() { + LOG.info("Getting tapi notification context"); + try { + InstanceIdentifier notificationcontextIID = + InstanceIdentifier.builder(Context.class).augmentation(Context1.class) + .child(NotificationContext.class).build(); + Optional notificationContextOptional + = this.networkTransactionService.read(LogicalDatastoreType.OPERATIONAL, notificationcontextIID).get(); + if (!notificationContextOptional.isPresent()) { + LOG.error("Could not get TAPI notification context"); + return null; + } + return notificationContextOptional.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Could not get TAPI notification context"); + } return null; } + + private boolean updateNotificationContext(NotificationContext notificationContext1) { + try { + InstanceIdentifier notificationcontextIID = + InstanceIdentifier.builder(Context.class).augmentation(Context1.class) + .child(NotificationContext.class).build(); + this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, notificationcontextIID, + notificationContext1); + this.networkTransactionService.commit().get(); + return true; + } catch (InterruptedException | ExecutionException e) { + LOG.error("Could not update TAPI notification context"); + } + return false; + } + + private Uuid getUuidFromIput(String serviceIdOrName) { + try { + UUID.fromString(serviceIdOrName); + LOG.info("Given attribute {} is a UUID", serviceIdOrName); + return new Uuid(serviceIdOrName); + } catch (IllegalArgumentException e) { + LOG.info("Given attribute {} is not a UUID", serviceIdOrName); + return new Uuid(UUID.nameUUIDFromBytes(serviceIdOrName.getBytes(StandardCharsets.UTF_8)).toString()); + } + } } diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java index 7940b1bde..863d689a3 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java @@ -79,7 +79,7 @@ public class NbiNotificationsProvider { public void init() { LOG.info("NbiNotificationsProvider Session Initiated"); NbiNotificationsImpl nbiImpl = new NbiNotificationsImpl(converterService, converterAlarmService, - subscriberServer, this.networkTransactionService, this.topicManager); + converterTapiService, subscriberServer, this.networkTransactionService, this.topicManager); rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class, nbiImpl); rpcService.registerRpcImplementation(TapiNotificationService.class, nbiImpl); NbiNotificationsListenerImpl nbiNotificationsListener = diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/TapiNotificationDeserializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/TapiNotificationDeserializer.java new file mode 100644 index 000000000..c4317f34d --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/TapiNotificationDeserializer.java @@ -0,0 +1,132 @@ +/* + * Copyright © 2021 Nokia, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.serialization; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.global._class.Name; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.global._class.NameBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.global._class.NameKey; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.Notification; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.NotificationBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.AdditionalInfo; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.AdditionalInfoBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.AdditionalInfoKey; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.AlarmInfoBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributes; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributesBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributesKey; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectName; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectNameBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectNameKey; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TcaInfoBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TapiNotificationDeserializer implements Deserializer { + private static final Logger LOG = LoggerFactory.getLogger(TapiNotificationDeserializer.class); + private JsonStringConverter converter; + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + LOG.info("Tapi Deserializer configuration {}", configs); + if (configs.containsKey(ConfigConstants.CONVERTER) + && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter) { + converter = (JsonStringConverter) configs.get(ConfigConstants.CONVERTER); + } + } + + @Override + public Notification deserialize(String topic, byte[] data) { + if (converter == null) { + throw new IllegalArgumentException( + "Converter should be configured through configure method of deserializer"); + } + String value = new String(data, StandardCharsets.UTF_8); + // The message published is + // org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService + // we have to map it to + // org.opendaylight.yang.gen.v1 + // .urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.Notification + NotificationTapiService mappedString = converter.createDataObjectFromJsonString( + YangInstanceIdentifier.of(NotificationTapiService.QNAME), value, JSONCodecFactorySupplier.RFC7951); + if (mappedString == null) { + return null; + } + LOG.info("Reading Tapi event {}", mappedString); + return transformNotificationTapiService(mappedString); + } + + private Notification transformNotificationTapiService(NotificationTapiService mappedString) { + LOG.info("Transforming TAPI notification for getNotificationList rpc"); + Map addInfoMap = new HashMap<>(); + if (mappedString.getAdditionalInfo() != null) { + for (org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.notification.tapi.service.AdditionalInfo + addInfo:mappedString.getAdditionalInfo().values()) { + AdditionalInfo transAddInfo = new AdditionalInfoBuilder() + .setValue(addInfo.getValue()) + .setValueName(addInfo.getValueName()) + .build(); + addInfoMap.put(transAddInfo.key(), transAddInfo); + } + } + Map changedAttMap = new HashMap<>(); + if (mappedString.getChangedAttributes() != null) { + for (org.opendaylight.yang.gen.v1 + .nbi.notifications.rev211013.notification.tapi.service.ChangedAttributes changedAtt:mappedString + .getChangedAttributes().values()) { + ChangedAttributes transChangedAtt = new ChangedAttributesBuilder(changedAtt).build(); + changedAttMap.put(transChangedAtt.key(), transChangedAtt); + } + } + Map nameMap = new HashMap<>(); + if (mappedString.getName() != null) { + for (Name name:mappedString.getName().values()) { + Name transName = new NameBuilder(name).build(); + nameMap.put(transName.key(), transName); + } + } + Map targetObjNameMap = new HashMap<>(); + if (mappedString.getTargetObjectName() != null) { + for (org.opendaylight.yang.gen.v1 + .nbi.notifications.rev211013.notification.tapi.service.TargetObjectName + targetObjectName:mappedString.getTargetObjectName().values()) { + TargetObjectName transTargetObjName = new TargetObjectNameBuilder(targetObjectName).build(); + targetObjNameMap.put(transTargetObjName.key(), transTargetObjName); + } + } + LOG.info("Notification uuid = {}", mappedString.getUuid().getValue()); + return new NotificationBuilder() + .setAlarmInfo(mappedString.getAlarmInfo() == null ? null + : new AlarmInfoBuilder(mappedString.getAlarmInfo()).build()) + .setAdditionalText(mappedString.getAdditionalText()) + .setAdditionalInfo(addInfoMap) + .setNotificationType(mappedString.getNotificationType()) + .setChangedAttributes(changedAttMap) + .setEventTimeStamp(mappedString.getEventTimeStamp()) + .setLayerProtocolName(mappedString.getLayerProtocolName()) + .setName(nameMap) + .setSequenceNumber(mappedString.getSequenceNumber()) + .setSourceIndicator(mappedString.getSourceIndicator()) + .setTargetObjectIdentifier(mappedString.getTargetObjectIdentifier()) + .setTargetObjectName(targetObjNameMap) + .setTargetObjectType(mappedString.getTargetObjectType()) + .setTcaInfo(mappedString.getTcaInfo() == null ? null + : new TcaInfoBuilder(mappedString.getTcaInfo()).build()) + .setUuid(mappedString.getUuid()) + .build(); + } +} + diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/TopicManager.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/TopicManager.java index 6960ab58a..ea596de38 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/TopicManager.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/TopicManager.java @@ -95,6 +95,18 @@ public final class TopicManager { } } + public void deleteTapiTopic(String topic) { + if (!tapiPublisherMap.containsKey(topic)) { + LOG.info("Tapi topic: {} doesnt exist", topic); + return; + } + LOG.info("Deleting tapi topic: {}", topic); + tapiPublisherMap.remove(topic); + if (this.nbiNotificationsListener != null) { + this.nbiNotificationsListener.setTapiPublishersMap(tapiPublisherMap); + } + } + public Map> getTapiTopicMap() { return this.tapiPublisherMap; } diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java index 3ba525824..018c2d622 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java @@ -27,6 +27,7 @@ import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotifications import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsProcessServiceOutput; import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationAlarmService; import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationProcessService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService; import org.opendaylight.yangtools.yang.common.RpcResult; public class NbiNotificationsImplTest extends AbstractTest { @@ -44,8 +45,11 @@ public class NbiNotificationsImplTest extends AbstractTest { getDataStoreContextUtil().getBindingDOMCodecServices()); JsonStringConverter converterAlarm = new JsonStringConverter<>( getDataStoreContextUtil().getBindingDOMCodecServices()); - nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm,"localhost:8080", - networkTransactionService, topicManager); + JsonStringConverter converterTapi = new JsonStringConverter<>( + getDataStoreContextUtil().getBindingDOMCodecServices()); + + nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm, converterTapi, + "localhost:8080", networkTransactionService, topicManager); } @Test -- 2.36.6