Implementation of T-API notification rpcs 50/98050/29
authorJavier Errea <errea@eurecom.fr>
Fri, 22 Oct 2021 13:40:49 +0000 (15:40 +0200)
committerGilles Thouenon <gilles.thouenon@orange.com>
Fri, 8 Jul 2022 13:09:08 +0000 (15:09 +0200)
- Tapi notification rpcs implementation
- tapi notification de-serializer
- Add ENABLE_AUTO_COMMIT_CONFIG to kafka subscriber
  configuration & and TopicPartition 0 to ConsumerRecords
  to enable to read TapiNotifications

JIRA: TRNSPRTPCE-560
Signed-off-by: errea <errea@eurecom.fr>
Change-Id: Ic49522b43c6351835eec67001e42887392a71238

nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/TapiNotificationDeserializer.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/TopicManager.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java

index 7ebc0bdf620cb09a11bc9e3e31fe691298374f5a..154cbb80f4d191bd225a8d24c2bc11f9d52d1de2 100644 (file)
@@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
@@ -39,6 +40,7 @@ public class Subscriber<T extends DataObject, D> {
         Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
         propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
+        propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf);
         propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
@@ -55,7 +57,7 @@ public class Subscriber<T extends DataObject, D> {
         final ConsumerRecords<String, D> consumerRecords = consumer.poll(Duration.ofMillis(1000));
         List<D> notificationServiceList = new ArrayList<>();
         YangInstanceIdentifier.of(name);
-        for (ConsumerRecord<String, D> record : consumerRecords) {
+        for (ConsumerRecord<String, D> record : consumerRecords.records(new TopicPartition(topicName, 0))) {
             if (record.value() != null) {
                 notificationServiceList.add(record.value());
             }
index ea5715f7d16ec0f768b24ab60efc38af9719862a..0a23c16e00b706dc2306efd655bc6bb8f74338ec 100644 (file)
@@ -8,10 +8,13 @@
 package org.opendaylight.transportpce.nbinotifications.impl;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -22,6 +25,7 @@ import org.opendaylight.transportpce.common.network.NetworkTransactionService;
 import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
+import org.opendaylight.transportpce.nbinotifications.serialization.TapiNotificationDeserializer;
 import org.opendaylight.transportpce.nbinotifications.utils.TopicManager;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsAlarmServiceInput;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsAlarmServiceOutput;
@@ -32,6 +36,7 @@ import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotifications
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NbiNotificationsService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationAlarmService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationProcessService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.get.notifications.alarm.service.output.NotificationsAlarmService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.get.notifications.process.service.output.NotificationsProcessService;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.Context;
@@ -42,14 +47,19 @@ import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev18121
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.CreateNotificationSubscriptionServiceOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.DeleteNotificationSubscriptionServiceInput;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.DeleteNotificationSubscriptionServiceOutput;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.DeleteNotificationSubscriptionServiceOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationListInput;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationListOutput;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationListOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceDetailsInput;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceDetailsOutput;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceDetailsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceListInput;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceListOutput;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceListOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetSupportedNotificationTypesInput;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetSupportedNotificationTypesOutput;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetSupportedNotificationTypesOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.NotificationType;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.ObjectType;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.TapiNotificationService;
@@ -59,6 +69,9 @@ import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev18121
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.context.NotificationContextBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.create.notification.subscription.service.output.SubscriptionService;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.create.notification.subscription.service.output.SubscriptionServiceBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.Notification;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.NotificationKey;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.subscription.service.list.output.SubscriptionServiceKey;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscription;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscriptionBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscriptionKey;
@@ -75,15 +88,18 @@ public class NbiNotificationsImpl implements NbiNotificationsService, TapiNotifi
     private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class);
     private final JsonStringConverter<NotificationProcessService> converterService;
     private final JsonStringConverter<NotificationAlarmService> converterAlarmService;
+    private final JsonStringConverter<NotificationTapiService> converterTapiService;
     private final String server;
     private final NetworkTransactionService networkTransactionService;
     private final TopicManager topicManager;
 
     public NbiNotificationsImpl(JsonStringConverter<NotificationProcessService> converterService,
-                                JsonStringConverter<NotificationAlarmService> converterAlarmService, String server,
+                                JsonStringConverter<NotificationAlarmService> converterAlarmService,
+                                JsonStringConverter<NotificationTapiService> converterTapiService, String server,
                                 NetworkTransactionService networkTransactionService, TopicManager topicManager) {
         this.converterService = converterService;
         this.converterAlarmService = converterAlarmService;
+        this.converterTapiService = converterTapiService;
         this.server = server;
         this.networkTransactionService = networkTransactionService;
         this.topicManager = topicManager;
@@ -125,111 +141,289 @@ public class NbiNotificationsImpl implements NbiNotificationsService, TapiNotifi
 
     @Override
     public ListenableFuture<RpcResult<GetSupportedNotificationTypesOutput>>
-        getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) {
-        return null;
+            getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) {
+        NotificationContext notificationContext = getNotificationContext();
+        if (notificationContext == null) {
+            return RpcResultBuilder.<GetSupportedNotificationTypesOutput>failed()
+                .withError(ErrorType.APPLICATION, "Couldnt get Notification Context from Datastore")
+                .buildFuture();
+        }
+        if (notificationContext.getNotifSubscription() == null) {
+            return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder()
+                .setSupportedNotificationTypes(new HashSet<>())
+                .setSupportedObjectTypes(new HashSet<>()).build()).buildFuture();
+        }
+        Set<NotificationType> notificationTypeList = new HashSet<>();
+        Set<ObjectType> objectTypeList = new HashSet<>();
+        for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) {
+            if (notifSubscription.getSupportedNotificationTypes() != null) {
+                notificationTypeList.addAll(notifSubscription.getSupportedNotificationTypes());
+            }
+            if (notifSubscription.getSupportedObjectTypes() != null) {
+                objectTypeList.addAll(notifSubscription.getSupportedObjectTypes());
+            }
+        }
+        return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder()
+            .setSupportedNotificationTypes(notificationTypeList)
+            .setSupportedObjectTypes(objectTypeList).build()).buildFuture();
     }
 
     @Override
     public ListenableFuture<RpcResult<CreateNotificationSubscriptionServiceOutput>>
             createNotificationSubscriptionService(CreateNotificationSubscriptionServiceInput input) {
-        try {
-            for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) {
-                LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue());
-                this.topicManager.addTapiTopic(uuid.getValue());
-            }
+        for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) {
+            LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue());
+            this.topicManager.addTapiTopic(uuid.getValue());
+        }
+        SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder()
+            .setName(input.getSubscriptionFilter().getName())
+            .setLocalId(input.getSubscriptionFilter().getLocalId())
+            .setIncludeContent(input.getSubscriptionFilter().getIncludeContent())
+            .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes())
+            .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols())
+            .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier())
+            .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes())
+            .build();
+        Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString());
+        SubscriptionService subscriptionService = new SubscriptionServiceBuilder()
+            .setSubscriptionFilter(subscriptionFilter)
+            .setSubscriptionState(input.getSubscriptionState())
+            .setUuid(notifSubscriptionUuid)
+            .build();
 
-            InstanceIdentifier<NotificationContext> notificationcontextIID =
-                InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
-                    .child(org.opendaylight.yang.gen.v1.urn
-                        .onf.otcc.yang.tapi.notification.rev181210.context.NotificationContext.class)
-                    .build();
-            Optional<NotificationContext> notificationContextOptional
-                = this.networkTransactionService.read(LogicalDatastoreType.OPERATIONAL, notificationcontextIID).get();
-            if (notificationContextOptional.isEmpty()) {
-                LOG.error("Could not create TAPI notification subscription service");
-                return RpcResultBuilder.<CreateNotificationSubscriptionServiceOutput>failed()
-                    .withError(ErrorType.RPC, "Could not read notification context")
-                    .buildFuture();
-            }
-            SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder()
-                .setName(input.getSubscriptionFilter().getName())
-                .setLocalId(input.getSubscriptionFilter().getLocalId())
-                .setIncludeContent(input.getSubscriptionFilter().getIncludeContent())
-                .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes())
-                .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols())
-                .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier())
-                .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes())
-                .build();
-            SubscriptionService subscriptionService = new SubscriptionServiceBuilder()
-                .setSubscriptionFilter(subscriptionFilter)
-                .setSubscriptionState(input.getSubscriptionState()).build();
-            Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString());
-            NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid);
-            Set<NotificationType> notificationTypes = (subscriptionFilter.getRequestedNotificationTypes() != null)
-                ? subscriptionFilter.getRequestedNotificationTypes()
-                : new HashSet<>(List.of(NotificationType.ALARMEVENT));
-            Set<ObjectType> objectTypes = (subscriptionFilter.getRequestedObjectTypes() != null)
-                ? subscriptionFilter.getRequestedObjectTypes()
-                : new HashSet<>(List.of(ObjectType.CONNECTIVITYSERVICE));
-            NotifSubscription notifSubscription = new NotifSubscriptionBuilder()
-                .setSubscriptionState(subscriptionService.getSubscriptionState())
-                .setSubscriptionFilter(subscriptionService.getSubscriptionFilter())
-                .setUuid(notifSubscriptionUuid)
-                .setSupportedNotificationTypes(notificationTypes)
-                .setSupportedObjectTypes(objectTypes)
-                .setName(subscriptionService.getName())
-                .build();
-            NotificationContext notificationContext = notificationContextOptional.get();
-            Map<NotifSubscriptionKey, NotifSubscription> notifSubscriptions = new HashMap<>();
-            if (notificationContext.getNotifSubscription() != null) {
-                notifSubscriptions.putAll(notificationContext.getNotifSubscription());
-            }
-            notifSubscriptions.put(notifSubscriptionKey, notifSubscription);
-            NotificationContext notificationContext1 = new NotificationContextBuilder()
-                .setNotification(notificationContext.getNotification())
-                .setNotifSubscription(notifSubscriptions)
-                .build();
-            this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, notificationcontextIID,
-                notificationContext1);
-            this.networkTransactionService.commit().get();
-            CreateNotificationSubscriptionServiceOutput serviceOutput =
-                new CreateNotificationSubscriptionServiceOutputBuilder()
-                    .setSubscriptionService(subscriptionService)
-                    .build();
-            return RpcResultBuilder.success(serviceOutput).buildFuture();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Could not create TAPI notification subscription service");
+        NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid);
+        Set<NotificationType> notificationTypes = (subscriptionFilter.getRequestedNotificationTypes() != null)
+            ? subscriptionFilter.getRequestedNotificationTypes()
+            : new HashSet<>(List.of(NotificationType.ALARMEVENT));
+        Set<ObjectType> objectTypes = (subscriptionFilter.getRequestedObjectTypes() != null)
+            ? subscriptionFilter.getRequestedObjectTypes()
+            : new HashSet<>(List.of(ObjectType.CONNECTIVITYSERVICE));
+        NotifSubscription notifSubscription = new NotifSubscriptionBuilder()
+            .setSubscriptionState(subscriptionService.getSubscriptionState())
+            .setSubscriptionFilter(subscriptionService.getSubscriptionFilter())
+            .setUuid(notifSubscriptionUuid)
+            .setSupportedNotificationTypes(notificationTypes)
+            .setSupportedObjectTypes(objectTypes)
+            .setName(subscriptionService.getName())
+            .build();
+        NotificationContext notificationContext = getNotificationContext();
+        Map<NotifSubscriptionKey, NotifSubscription> notifSubscriptions = new HashMap<>();
+        if (notificationContext != null && notificationContext.getNotifSubscription() != null) {
+            notifSubscriptions.putAll(notificationContext.getNotifSubscription());
+        }
+        notifSubscriptions.put(notifSubscriptionKey, notifSubscription);
+        NotificationContext notificationContext1 = new NotificationContextBuilder()
+            .setNotification(notificationContext == null ? new HashMap<>() : notificationContext.getNotification())
+            .setNotifSubscription(notifSubscriptions)
+            .build();
+        if (!updateNotificationContext(notificationContext1)) {
+            LOG.error("Failed to update Notification context");
             return RpcResultBuilder.<CreateNotificationSubscriptionServiceOutput>failed()
-                .withError(ErrorType.RPC, "Could not read notification context").buildFuture();
+                .withError(ErrorType.RPC, "Failed to update notification context").buildFuture();
         }
+        CreateNotificationSubscriptionServiceOutput serviceOutput =
+            new CreateNotificationSubscriptionServiceOutputBuilder()
+                .setSubscriptionService(subscriptionService)
+                .build();
+        return RpcResultBuilder.success(serviceOutput).buildFuture();
     }
 
     @Override
     public ListenableFuture<RpcResult<UpdateNotificationSubscriptionServiceOutput>>
-        updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) {
+            updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) {
+        // TODO --> Not yet implemented
         return null;
     }
 
     @Override
     public ListenableFuture<RpcResult<DeleteNotificationSubscriptionServiceOutput>>
-        deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) {
-        return null;
+            deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) {
+        try {
+            if (input == null || input.getSubscriptionIdOrName() == null) {
+                LOG.warn("Missing mandatory params for input {}", input);
+                return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
+                    .withError(ErrorType.RPC, "Missing input parameters").buildFuture();
+            }
+            Uuid notifSubsUuid = getUuidFromIput(input.getSubscriptionIdOrName());
+            InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
+                .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
+                    new NotifSubscriptionKey(notifSubsUuid)).build();
+            Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
+                LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
+
+            if (!optionalNotifSub.isPresent()) {
+                return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
+                    .withError(ErrorType.APPLICATION,
+                        "Notification subscription doesnt exist").buildFuture();
+            }
+            NotifSubscription notifSubscription = optionalNotifSub.get();
+            this.networkTransactionService.delete(LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID);
+            this.networkTransactionService.commit().get();
+            for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
+                this.topicManager.deleteTapiTopic(objectUuid.getValue());
+            }
+            return RpcResultBuilder.success(new DeleteNotificationSubscriptionServiceOutputBuilder().build())
+                .buildFuture();
+        } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
+            LOG.error("Failed to delete Notification subscription service", e);
+        }
+        return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
+            .withError(ErrorType.APPLICATION,
+                "Failed to delete notification subscription service").buildFuture();
     }
 
     @Override
     public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceDetailsOutput>>
-        getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) {
-        return null;
+            getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) {
+        if (input == null || input.getSubscriptionIdOrName() == null) {
+            LOG.warn("Missing mandatory params for input {}", input);
+            return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
+                .withError(ErrorType.RPC, "Missing input parameters").buildFuture();
+        }
+        Uuid notifSubsUuid = getUuidFromIput(input.getSubscriptionIdOrName());
+        NotificationContext notificationContext = getNotificationContext();
+        if (notificationContext == null) {
+            return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
+                .withError(ErrorType.APPLICATION, "Notification context is empty")
+                .buildFuture();
+        }
+        if (notificationContext.getNotifSubscription() == null) {
+            return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder()
+                .setSubscriptionService(new org.opendaylight.yang.gen.v1
+                    .urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.subscription.service
+                        .details.output.SubscriptionServiceBuilder().build()).build()).buildFuture();
+        }
+        if (!notificationContext.getNotifSubscription().containsKey(new NotifSubscriptionKey(notifSubsUuid))) {
+            return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
+                .withError(ErrorType.APPLICATION,
+                    "Notification subscription service doesnt exist").buildFuture();
+        }
+        return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder()
+            .setSubscriptionService(new org.opendaylight.yang.gen.v1.urn
+                .onf.otcc.yang.tapi.notification.rev181210.get.notification.subscription.service.details.output
+                .SubscriptionServiceBuilder(notificationContext.getNotifSubscription().get(
+                    new NotifSubscriptionKey(notifSubsUuid))).build()).build()).buildFuture();
     }
 
     @Override
     public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceListOutput>>
             getNotificationSubscriptionServiceList(GetNotificationSubscriptionServiceListInput input) {
-        return null;
+        NotificationContext notificationContext = getNotificationContext();
+        if (notificationContext == null) {
+            return RpcResultBuilder.<GetNotificationSubscriptionServiceListOutput>failed()
+                .withError(ErrorType.APPLICATION, "Notification context is empty")
+                .buildFuture();
+        }
+        if (notificationContext.getNotifSubscription() == null) {
+            return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder()
+                .setSubscriptionService(new HashMap<>()).build()).buildFuture();
+        }
+        Map<SubscriptionServiceKey, org.opendaylight.yang.gen.v1.urn.onf.otcc.yang
+            .tapi.notification.rev181210.get.notification.subscription.service.list.output.SubscriptionService>
+                notifSubsMap = new HashMap<>();
+        for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) {
+            org.opendaylight.yang.gen.v1.urn.onf.otcc.yang
+                .tapi.notification.rev181210.get.notification.subscription.service.list.output.SubscriptionService
+                    subscriptionService = new org.opendaylight.yang.gen.v1
+                        .urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.subscription.service
+                            .list.output.SubscriptionServiceBuilder(notifSubscription).build();
+            notifSubsMap.put(subscriptionService.key(), subscriptionService);
+        }
+        return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder()
+            .setSubscriptionService(notifSubsMap).build()).buildFuture();
     }
 
     @Override
     public ListenableFuture<RpcResult<GetNotificationListOutput>> getNotificationList(GetNotificationListInput input) {
+        try {
+            LOG.info("RPC getNotificationList received");
+            if (input == null || input.getSubscriptionIdOrName() == null) {
+                LOG.warn("Missing mandatory params for input {}", input);
+                return RpcResultBuilder.<GetNotificationListOutput>failed().withError(ErrorType.RPC,
+                    "Missing input parameters").buildFuture();
+            }
+            Uuid notifSubsUuid = getUuidFromIput(input.getSubscriptionIdOrName());
+            InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
+                .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
+                    new NotifSubscriptionKey(notifSubsUuid)).build();
+            Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
+                LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
+
+            if (!optionalNotifSub.isPresent()) {
+                return RpcResultBuilder.<GetNotificationListOutput>failed()
+                    .withError(ErrorType.APPLICATION,
+                        "Notification subscription doesnt exist").buildFuture();
+            }
+            NotifSubscription notifSubscription = optionalNotifSub.get();
+            List<Notification> notificationTapiList = new ArrayList<>();
+            for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
+                if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
+                    LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
+                    continue;
+                }
+                LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
+                Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
+                    objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
+                    TapiNotificationDeserializer.class);
+                notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
+            }
+            LOG.info("TAPI notifications = {}", notificationTapiList);
+            Map<NotificationKey, Notification> notificationMap = new HashMap<>();
+            for (Notification notif:notificationTapiList) {
+                notificationMap.put(notif.key(), notif);
+            }
+            return RpcResultBuilder.success(new GetNotificationListOutputBuilder()
+                .setNotification(notificationMap).build()).buildFuture();
+        } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
+            LOG.error("Failed to get Notifications from Kafka", e);
+        }
+        return RpcResultBuilder.<GetNotificationListOutput>failed()
+            .withError(ErrorType.APPLICATION,
+                "Notifications couldnt be retrieved from Kafka server").buildFuture();
+    }
+
+    private NotificationContext getNotificationContext() {
+        LOG.info("Getting tapi notification context");
+        try {
+            InstanceIdentifier<NotificationContext> notificationcontextIID =
+                InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
+                    .child(NotificationContext.class).build();
+            Optional<NotificationContext> notificationContextOptional
+                = this.networkTransactionService.read(LogicalDatastoreType.OPERATIONAL, notificationcontextIID).get();
+            if (!notificationContextOptional.isPresent()) {
+                LOG.error("Could not get TAPI notification context");
+                return null;
+            }
+            return notificationContextOptional.get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Could not get TAPI notification context");
+        }
         return null;
     }
+
+    private boolean updateNotificationContext(NotificationContext notificationContext1) {
+        try {
+            InstanceIdentifier<NotificationContext> notificationcontextIID =
+                InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
+                    .child(NotificationContext.class).build();
+            this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, notificationcontextIID,
+                notificationContext1);
+            this.networkTransactionService.commit().get();
+            return true;
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Could not update TAPI notification context");
+        }
+        return false;
+    }
+
+    private Uuid getUuidFromIput(String serviceIdOrName) {
+        try {
+            UUID.fromString(serviceIdOrName);
+            LOG.info("Given attribute {} is a UUID", serviceIdOrName);
+            return new Uuid(serviceIdOrName);
+        } catch (IllegalArgumentException e) {
+            LOG.info("Given attribute {} is not a UUID", serviceIdOrName);
+            return new Uuid(UUID.nameUUIDFromBytes(serviceIdOrName.getBytes(StandardCharsets.UTF_8)).toString());
+        }
+    }
 }
index 7940b1bde8b8ea0deb1201cd7949c2366ec09f15..863d689a3af8db038276ca6435902caf1f8b2c2d 100644 (file)
@@ -79,7 +79,7 @@ public class NbiNotificationsProvider {
     public void init() {
         LOG.info("NbiNotificationsProvider Session Initiated");
         NbiNotificationsImpl nbiImpl = new NbiNotificationsImpl(converterService, converterAlarmService,
-            subscriberServer, this.networkTransactionService, this.topicManager);
+            converterTapiService, subscriberServer, this.networkTransactionService, this.topicManager);
         rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class, nbiImpl);
         rpcService.registerRpcImplementation(TapiNotificationService.class, nbiImpl);
         NbiNotificationsListenerImpl nbiNotificationsListener =
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/TapiNotificationDeserializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/TapiNotificationDeserializer.java
new file mode 100644 (file)
index 0000000..c4317f3
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * Copyright © 2021 Nokia, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.global._class.Name;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.global._class.NameBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.global._class.NameKey;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.Notification;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.NotificationBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.AdditionalInfo;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.AdditionalInfoBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.AdditionalInfoKey;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.AlarmInfoBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributes;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributesBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributesKey;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectName;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectNameBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectNameKey;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TcaInfoBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TapiNotificationDeserializer implements Deserializer<Notification> {
+    private static final Logger LOG = LoggerFactory.getLogger(TapiNotificationDeserializer.class);
+    private JsonStringConverter<NotificationTapiService> converter;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        LOG.info("Tapi Deserializer configuration {}", configs);
+        if (configs.containsKey(ConfigConstants.CONVERTER)
+            && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+            converter = (JsonStringConverter<NotificationTapiService>) configs.get(ConfigConstants.CONVERTER);
+        }
+    }
+
+    @Override
+    public Notification deserialize(String topic, byte[] data) {
+        if (converter == null) {
+            throw new IllegalArgumentException(
+                "Converter should be configured through configure method of deserializer");
+        }
+        String value = new String(data, StandardCharsets.UTF_8);
+        // The message published is
+        // org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService
+        // we have to map it to
+        // org.opendaylight.yang.gen.v1
+        // .urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.Notification
+        NotificationTapiService mappedString = converter.createDataObjectFromJsonString(
+            YangInstanceIdentifier.of(NotificationTapiService.QNAME), value, JSONCodecFactorySupplier.RFC7951);
+        if (mappedString == null) {
+            return null;
+        }
+        LOG.info("Reading Tapi event {}", mappedString);
+        return transformNotificationTapiService(mappedString);
+    }
+
+    private Notification transformNotificationTapiService(NotificationTapiService mappedString) {
+        LOG.info("Transforming TAPI notification for getNotificationList rpc");
+        Map<AdditionalInfoKey, AdditionalInfo> addInfoMap = new HashMap<>();
+        if (mappedString.getAdditionalInfo() != null) {
+            for (org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.notification.tapi.service.AdditionalInfo
+                    addInfo:mappedString.getAdditionalInfo().values()) {
+                AdditionalInfo transAddInfo = new AdditionalInfoBuilder()
+                    .setValue(addInfo.getValue())
+                    .setValueName(addInfo.getValueName())
+                    .build();
+                addInfoMap.put(transAddInfo.key(), transAddInfo);
+            }
+        }
+        Map<ChangedAttributesKey, ChangedAttributes> changedAttMap = new HashMap<>();
+        if (mappedString.getChangedAttributes() != null) {
+            for (org.opendaylight.yang.gen.v1
+                    .nbi.notifications.rev211013.notification.tapi.service.ChangedAttributes changedAtt:mappedString
+                        .getChangedAttributes().values()) {
+                ChangedAttributes transChangedAtt = new ChangedAttributesBuilder(changedAtt).build();
+                changedAttMap.put(transChangedAtt.key(), transChangedAtt);
+            }
+        }
+        Map<NameKey, Name> nameMap = new HashMap<>();
+        if (mappedString.getName() != null) {
+            for (Name name:mappedString.getName().values()) {
+                Name transName = new NameBuilder(name).build();
+                nameMap.put(transName.key(), transName);
+            }
+        }
+        Map<TargetObjectNameKey, TargetObjectName> targetObjNameMap = new HashMap<>();
+        if (mappedString.getTargetObjectName() != null) {
+            for (org.opendaylight.yang.gen.v1
+                    .nbi.notifications.rev211013.notification.tapi.service.TargetObjectName
+                        targetObjectName:mappedString.getTargetObjectName().values()) {
+                TargetObjectName transTargetObjName = new TargetObjectNameBuilder(targetObjectName).build();
+                targetObjNameMap.put(transTargetObjName.key(), transTargetObjName);
+            }
+        }
+        LOG.info("Notification uuid = {}", mappedString.getUuid().getValue());
+        return new NotificationBuilder()
+            .setAlarmInfo(mappedString.getAlarmInfo() == null ? null
+                : new AlarmInfoBuilder(mappedString.getAlarmInfo()).build())
+            .setAdditionalText(mappedString.getAdditionalText())
+            .setAdditionalInfo(addInfoMap)
+            .setNotificationType(mappedString.getNotificationType())
+            .setChangedAttributes(changedAttMap)
+            .setEventTimeStamp(mappedString.getEventTimeStamp())
+            .setLayerProtocolName(mappedString.getLayerProtocolName())
+            .setName(nameMap)
+            .setSequenceNumber(mappedString.getSequenceNumber())
+            .setSourceIndicator(mappedString.getSourceIndicator())
+            .setTargetObjectIdentifier(mappedString.getTargetObjectIdentifier())
+            .setTargetObjectName(targetObjNameMap)
+            .setTargetObjectType(mappedString.getTargetObjectType())
+            .setTcaInfo(mappedString.getTcaInfo() == null ? null
+                : new TcaInfoBuilder(mappedString.getTcaInfo()).build())
+            .setUuid(mappedString.getUuid())
+            .build();
+    }
+}
+
index 6960ab58a2218487a5e9b7e7f74c9a9f424ef4e4..ea596de38d1e275af3eb28333a127f6adf373f3f 100644 (file)
@@ -95,6 +95,18 @@ public final class TopicManager {
         }
     }
 
+    public void deleteTapiTopic(String topic) {
+        if (!tapiPublisherMap.containsKey(topic)) {
+            LOG.info("Tapi topic: {} doesnt exist", topic);
+            return;
+        }
+        LOG.info("Deleting tapi topic: {}", topic);
+        tapiPublisherMap.remove(topic);
+        if (this.nbiNotificationsListener != null) {
+            this.nbiNotificationsListener.setTapiPublishersMap(tapiPublisherMap);
+        }
+    }
+
     public Map<String, Publisher<NotificationTapiService>> getTapiTopicMap() {
         return this.tapiPublisherMap;
     }
index 3ba5258240f1c09ce1f325d2985dbf9ccbc7b19d..018c2d62257796d9afdf3bc7d09ec4c8717b1730 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotifications
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsProcessServiceOutput;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationAlarmService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationProcessService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
 public class NbiNotificationsImplTest extends AbstractTest {
@@ -44,8 +45,11 @@ public class NbiNotificationsImplTest extends AbstractTest {
                 getDataStoreContextUtil().getBindingDOMCodecServices());
         JsonStringConverter<NotificationAlarmService> converterAlarm = new JsonStringConverter<>(
                 getDataStoreContextUtil().getBindingDOMCodecServices());
-        nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm,"localhost:8080",
-            networkTransactionService, topicManager);
+        JsonStringConverter<NotificationTapiService> converterTapi = new JsonStringConverter<>(
+            getDataStoreContextUtil().getBindingDOMCodecServices());
+
+        nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm, converterTapi,
+            "localhost:8080", networkTransactionService, topicManager);
     }
 
     @Test