X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=nbinotifications%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Ftransportpce%2Fnbinotifications%2Fimpl%2FNbiNotificationsImpl.java;h=91e71eca11a34a8af144c342944cfececb10cac4;hb=942384f299eecdf5163014ff7930b12ce26a2ca3;hp=073ce49f8315e5b5cf3905ad5d70fff5f3736287;hpb=56ad6406faba3227e19a6f609325c8b9dbaca3fb;p=transportpce.git diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java index 073ce49f8..91e71eca1 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java @@ -7,45 +7,479 @@ */ package org.opendaylight.transportpce.nbinotifications.impl; +import com.google.common.collect.ImmutableClassToInstanceMap; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.common.network.NetworkTransactionService; import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInput; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutputBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; +import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer; +import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer; +import org.opendaylight.transportpce.nbinotifications.serialization.TapiNotificationDeserializer; +import org.opendaylight.transportpce.nbinotifications.utils.TopicManager; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsAlarmService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsAlarmServiceInput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsAlarmServiceOutput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsAlarmServiceOutputBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsProcessService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsProcessServiceInput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsProcessServiceOutput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsProcessServiceOutputBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NbiNotificationsService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationAlarmService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationProcessService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationTapiService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.get.notifications.alarm.service.output.NotificationsAlarmService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.get.notifications.process.service.output.NotificationsProcessService; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Context; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.OBJECTTYPE; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.OBJECTTYPEPROFILE; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.OBJECTTYPESERVICEINTERFACEPOINT; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.OBJECTTYPETAPICONTEXT; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Uuid; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.Context1; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.CreateNotificationSubscriptionService; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.CreateNotificationSubscriptionServiceInput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.CreateNotificationSubscriptionServiceOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.CreateNotificationSubscriptionServiceOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.DeleteNotificationSubscriptionService; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.DeleteNotificationSubscriptionServiceInput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.DeleteNotificationSubscriptionServiceOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.DeleteNotificationSubscriptionServiceOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationList; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListInput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceDetails; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceDetailsInput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceDetailsOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceDetailsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceList; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceListInput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceListOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceListOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetSupportedNotificationTypes; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetSupportedNotificationTypesInput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetSupportedNotificationTypesOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetSupportedNotificationTypesOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.NOTIFICATIONTYPE; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.NOTIFICATIONTYPEATTRIBUTEVALUECHANGE; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.NOTIFICATIONTYPEOBJECTCREATION; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.NOTIFICATIONTYPEOBJECTDELETION; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.TapiNotificationService; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.UpdateNotificationSubscriptionService; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.UpdateNotificationSubscriptionServiceInput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.UpdateNotificationSubscriptionServiceOutput; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.context.NotificationContext; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.context.NotificationContextBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.create.notification.subscription.service.output.SubscriptionService; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.create.notification.subscription.service.output.SubscriptionServiceBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.Notification; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.NotificationKey; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionServiceKey; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscription; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscriptionBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscriptionKey; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilter; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilterBuilder; +import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilterKey; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.Rpc; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NbiNotificationsImpl implements NbiNotificationsService { +public class NbiNotificationsImpl implements NbiNotificationsService, TapiNotificationService { private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class); - private final JsonStringConverter converter; + private final JsonStringConverter converterService; + private final JsonStringConverter converterAlarmService; + private final JsonStringConverter converterTapiService; private final String server; + private final NetworkTransactionService networkTransactionService; + private final TopicManager topicManager; - public NbiNotificationsImpl(JsonStringConverter converter, String server) { - this.converter = converter; + public NbiNotificationsImpl(JsonStringConverter converterService, + JsonStringConverter converterAlarmService, + JsonStringConverter converterTapiService, String server, + NetworkTransactionService networkTransactionService, TopicManager topicManager) { + this.converterService = converterService; + this.converterAlarmService = converterAlarmService; + this.converterTapiService = converterTapiService; this.server = server; + this.networkTransactionService = networkTransactionService; + this.topicManager = topicManager; } @Override - public ListenableFuture> getNotificationsService( - GetNotificationsServiceInput input) { + public ListenableFuture> getNotificationsProcessService( + GetNotificationsProcessServiceInput input) { LOG.info("RPC getNotificationsService received"); if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) { LOG.warn("Missing mandatory params for input {}", input); - return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture(); + return RpcResultBuilder.success(new GetNotificationsProcessServiceOutputBuilder().build()).buildFuture(); } - Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converter); - List notificationServiceList = subscriber - .subscribeService(input.getConnectionType().getName()); - GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder() - .setNotificationService(notificationServiceList); - return RpcResultBuilder.success(output.build()).buildFuture(); + Subscriber subscriber = new Subscriber<>( + input.getIdConsumer(), input.getGroupId(), server, converterService, + NotificationServiceDeserializer.class); + List notificationServiceList = subscriber + .subscribe(input.getConnectionType().getName(), NotificationsProcessService.QNAME); + return RpcResultBuilder.success(new GetNotificationsProcessServiceOutputBuilder() + .setNotificationsProcessService(notificationServiceList).build()).buildFuture(); } + + @Override + public ListenableFuture> getNotificationsAlarmService( + GetNotificationsAlarmServiceInput input) { + LOG.info("RPC getNotificationsAlarmService received"); + if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture(); + } + Subscriber subscriber = new Subscriber<>( + input.getIdConsumer(), input.getGroupId(), server, converterAlarmService, + NotificationAlarmServiceDeserializer.class); + List notificationAlarmServiceList = subscriber + .subscribe("alarm" + input.getConnectionType().getName(), NotificationsAlarmService.QNAME); + return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder() + .setNotificationsAlarmService(notificationAlarmServiceList).build()).buildFuture(); + } + + @Override + public ListenableFuture> + getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) { + NotificationContext notificationContext = getNotificationContext(); + if (notificationContext == null) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Couldnt get Notification Context from Datastore") + .buildFuture(); + } + //TAPI 2.4 removes supported notification types from notif-subscription list and notification-context + //No way to store what notification types are supported + //Considers that by default all notification are supported + Set notificationTypeList = new HashSet<>(); + notificationTypeList.add(NOTIFICATIONTYPEOBJECTCREATION.VALUE); + notificationTypeList.add(NOTIFICATIONTYPEOBJECTDELETION.VALUE); + notificationTypeList.add(NOTIFICATIONTYPEATTRIBUTEVALUECHANGE.VALUE); +// +// if (notificationContext.getNotifSubscription() == null) { +// return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder() +// .setSupportedNotificationTypes(new HashSet<>()) +// .setSupportedObjectTypes(new HashSet<>()).build()).buildFuture(); +// } +// Set notificationTypeList = new HashSet<>(); + + //TAPI 2.4 removes supported object types from notif-subscription list and notification-context + //No way to store what object types are supported + //Considers that by default all object are supported + Set objectTypeList = new HashSet<>(); + objectTypeList.add(OBJECTTYPESERVICEINTERFACEPOINT.VALUE); + objectTypeList.add(OBJECTTYPETAPICONTEXT.VALUE); + objectTypeList.add(OBJECTTYPEPROFILE.VALUE); +// for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) { +// if (notifSubscription.getSupportedNotificationTypes() != null) { +// notificationTypeList.addAll(notifSubscription.getSupportedNotificationTypes()); +// } +// if (notifSubscription.getSupportedObjectTypes() != null) { +// objectTypeList.addAll(notifSubscription.getSupportedObjectTypes()); +// } +// } + return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder() + .setSupportedNotificationTypes(notificationTypeList) + .setSupportedObjectTypes(objectTypeList).build()).buildFuture(); + } + + @Override + public ListenableFuture> + createNotificationSubscriptionService(CreateNotificationSubscriptionServiceInput input) { + for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) { + LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue()); + this.topicManager.addTapiTopic(uuid.getValue()); + } + SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder() + .setName(input.getSubscriptionFilter().getName()) + .setLocalId(input.getSubscriptionFilter().getLocalId()) + .setIncludeContent(input.getSubscriptionFilter().getIncludeContent()) + .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes()) + .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols()) + .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier()) + .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes()) + .build(); + Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString()); + Map sfmap = new HashMap<>(); + sfmap.put(subscriptionFilter.key(), subscriptionFilter); + SubscriptionService subscriptionService = new SubscriptionServiceBuilder() + .setSubscriptionFilter(sfmap) + .setSubscriptionState(input.getSubscriptionState()) + .setUuid(notifSubscriptionUuid) + .build(); + + NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid); + NotifSubscription notifSubscription = new NotifSubscriptionBuilder() + .setSubscriptionState(subscriptionService.getSubscriptionState()) + .setSubscriptionFilter(subscriptionService.getSubscriptionFilter()) + .setUuid(notifSubscriptionUuid) +// Following 2 items are no more in notification-context with T-API 2.4 +// .setSupportedNotificationTypes(notificationTypes) +// .setSupportedObjectTypes(objectTypes) + .setName(subscriptionService.getName()) + .build(); + NotificationContext notificationContext = getNotificationContext(); + Map notifSubscriptions = new HashMap<>(); + if (notificationContext != null && notificationContext.getNotifSubscription() != null) { + notifSubscriptions.putAll(notificationContext.getNotifSubscription()); + } + notifSubscriptions.put(notifSubscriptionKey, notifSubscription); + NotificationContext notificationContext1 = new NotificationContextBuilder() + .setNotification(notificationContext == null ? new HashMap<>() : notificationContext.getNotification()) + .setNotifSubscription(notifSubscriptions) + .build(); + if (!updateNotificationContext(notificationContext1)) { + LOG.error("Failed to update Notification context"); + return RpcResultBuilder.failed() + .withError(ErrorType.RPC, "Failed to update notification context").buildFuture(); + } + CreateNotificationSubscriptionServiceOutput serviceOutput = + new CreateNotificationSubscriptionServiceOutputBuilder() + .setSubscriptionService(subscriptionService) + .build(); + return RpcResultBuilder.success(serviceOutput).buildFuture(); + } + + @Override + public ListenableFuture> + updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) { + // TODO --> Not yet implemented + return RpcResultBuilder.failed() + .withError(ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED, "RPC not implemented yet") + .buildFuture(); + } + + @Override + public ListenableFuture> + deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) { + try { + if (input == null || input.getUuid() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.failed() + .withError(ErrorType.RPC, "Missing input parameters").buildFuture(); + } + Uuid notifSubsUuid = input.getUuid(); + InstanceIdentifier notifSubscriptionIID = InstanceIdentifier.builder(Context.class) + .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class, + new NotifSubscriptionKey(notifSubsUuid)).build(); + Optional optionalNotifSub = this.networkTransactionService.read( + LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get(); + + if (optionalNotifSub.isEmpty()) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Notification subscription doesnt exist").buildFuture(); + } + NotifSubscription notifSubscription = optionalNotifSub.orElseThrow(); + this.networkTransactionService.delete(LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID); + this.networkTransactionService.commit().get(); + for (Map.Entry sfEntry : notifSubscription + .getSubscriptionFilter().entrySet()) { + for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) { + this.topicManager.deleteTapiTopic(objectUuid.getValue()); + } + } +// for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) { +// this.topicManager.deleteTapiTopic(objectUuid.getValue()); +// } + return RpcResultBuilder.success(new DeleteNotificationSubscriptionServiceOutputBuilder().build()) + .buildFuture(); + } catch (InterruptedException | ExecutionException | NoSuchElementException e) { + LOG.error("Failed to delete Notification subscription service", e); + } + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Failed to delete notification subscription service").buildFuture(); + } + + @Override + public ListenableFuture> + getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) { + if (input == null || input.getUuid() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.failed() + .withError(ErrorType.RPC, "Missing input parameters").buildFuture(); + } + Uuid notifSubsUuid = input.getUuid(); + NotificationContext notificationContext = getNotificationContext(); + if (notificationContext == null) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Notification context is empty") + .buildFuture(); + } + if (notificationContext.getNotifSubscription() == null) { + return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder() + .setSubscriptionService(new org.opendaylight.yang.gen.v1 + .urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service + .details.output.SubscriptionServiceBuilder().build()).build()).buildFuture(); + } + if (!notificationContext.getNotifSubscription().containsKey(new NotifSubscriptionKey(notifSubsUuid))) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Notification subscription service doesnt exist").buildFuture(); + } + return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder() + .setSubscriptionService(new org.opendaylight.yang.gen.v1.urn + .onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service.details.output + .SubscriptionServiceBuilder(notificationContext.getNotifSubscription().get( + new NotifSubscriptionKey(notifSubsUuid))).build()).build()).buildFuture(); + } + + @Override + public ListenableFuture> + getNotificationSubscriptionServiceList(GetNotificationSubscriptionServiceListInput input) { + NotificationContext notificationContext = getNotificationContext(); + if (notificationContext == null) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Notification context is empty") + .buildFuture(); + } + if (notificationContext.getNotifSubscription() == null) { + return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder() + .setSubscriptionService(new HashMap<>()).build()).buildFuture(); + } + Map + notifSubsMap = new HashMap<>(); + for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) { + org.opendaylight.yang.gen.v1.urn.onf.otcc.yang + .tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionService + subscriptionService = new org.opendaylight.yang.gen.v1 + .urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service + .list.output.SubscriptionServiceBuilder(notifSubscription).build(); + notifSubsMap.put(subscriptionService.key(), subscriptionService); + } + return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder() + .setSubscriptionService(notifSubsMap).build()).buildFuture(); + } + + @Override + public ListenableFuture> getNotificationList(GetNotificationListInput input) { + try { + LOG.info("RPC getNotificationList received"); + if (input == null || input.getSubscriptionId() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.failed().withError(ErrorType.RPC, + "Missing input parameters").buildFuture(); + } + Uuid notifSubsUuid = input.getSubscriptionId(); + InstanceIdentifier notifSubscriptionIID = InstanceIdentifier.builder(Context.class) + .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class, + new NotifSubscriptionKey(notifSubsUuid)).build(); + Optional optionalNotifSub = this.networkTransactionService.read( + LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get(); + + if (optionalNotifSub.isEmpty()) { + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Notification subscription doesnt exist").buildFuture(); + } + NotifSubscription notifSubscription = optionalNotifSub.orElseThrow(); + List notificationTapiList = new ArrayList<>(); + for (Map.Entry sfEntry : notifSubscription + .getSubscriptionFilter().entrySet()) { + for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) { + if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) { + LOG.warn("Topic doesnt exist for {}", objectUuid.getValue()); + continue; + } + LOG.info("Going to get notifications for topic {}", objectUuid.getValue()); + Subscriber subscriber = new Subscriber<>( + objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService, + TapiNotificationDeserializer.class); + notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME)); + } + } +// for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) { +// if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) { +// LOG.warn("Topic doesnt exist for {}", objectUuid.getValue()); +// continue; +// } +// LOG.info("Going to get notifications for topic {}", objectUuid.getValue()); +// Subscriber subscriber = new Subscriber<>( +// objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService, +// TapiNotificationDeserializer.class); +// notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME)); +// } + LOG.info("TAPI notifications = {}", notificationTapiList); + Map notificationMap = new HashMap<>(); + for (Notification notif:notificationTapiList) { + notificationMap.put(notif.key(), notif); + } + return RpcResultBuilder.success(new GetNotificationListOutputBuilder() + .setNotification(notificationMap).build()).buildFuture(); + } catch (InterruptedException | ExecutionException | NoSuchElementException e) { + LOG.error("Failed to get Notifications from Kafka", e); + } + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, + "Notifications couldnt be retrieved from Kafka server").buildFuture(); + } + + public ImmutableClassToInstanceMap> registerRPCs() { + return ImmutableClassToInstanceMap.>builder() + .put(GetNotificationsProcessService.class, this::getNotificationsProcessService) + .put(GetNotificationsAlarmService.class, this::getNotificationsAlarmService) + .put(GetSupportedNotificationTypes.class, this::getSupportedNotificationTypes) + .put(CreateNotificationSubscriptionService.class, this::createNotificationSubscriptionService) + .put(UpdateNotificationSubscriptionService.class, this::updateNotificationSubscriptionService) + .put(DeleteNotificationSubscriptionService.class, this::deleteNotificationSubscriptionService) + .put(GetNotificationSubscriptionServiceDetails.class, this::getNotificationSubscriptionServiceDetails) + .put(GetNotificationSubscriptionServiceList.class, this::getNotificationSubscriptionServiceList) + .put(GetNotificationList.class, this::getNotificationList) + .build(); + } + + private NotificationContext getNotificationContext() { + LOG.info("Getting tapi notification context"); + try { + InstanceIdentifier notificationcontextIID = + InstanceIdentifier.builder(Context.class).augmentation(Context1.class) + .child(NotificationContext.class).build(); + Optional notificationContextOptional + = this.networkTransactionService.read(LogicalDatastoreType.OPERATIONAL, notificationcontextIID).get(); + if (!notificationContextOptional.isPresent()) { + LOG.error("Could not get TAPI notification context"); + return null; + } + return notificationContextOptional.orElseThrow(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Could not get TAPI notification context"); + } + return null; + } + + private boolean updateNotificationContext(NotificationContext notificationContext1) { + try { + InstanceIdentifier notificationcontextIID = + InstanceIdentifier.builder(Context.class).augmentation(Context1.class) + .child(NotificationContext.class).build(); + this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, notificationcontextIID, + notificationContext1); + this.networkTransactionService.commit().get(); + return true; + } catch (InterruptedException | ExecutionException e) { + LOG.error("Could not update TAPI notification context"); + } + return false; + } + }