+
+ @Override
+ public ListenableFuture<RpcResult<GetNotificationsAlarmServiceOutput>> getNotificationsAlarmService(
+ GetNotificationsAlarmServiceInput input) {
+ LOG.info("RPC getNotificationsAlarmService received");
+ if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
+ LOG.warn("Missing mandatory params for input {}", input);
+ return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture();
+ }
+ Subscriber<NotificationAlarmService, NotificationsAlarmService> subscriber = new Subscriber<>(
+ input.getIdConsumer(), input.getGroupId(), server, converterAlarmService,
+ NotificationAlarmServiceDeserializer.class);
+ List<NotificationsAlarmService> notificationAlarmServiceList = subscriber
+ .subscribe("alarm" + input.getConnectionType().getName(), NotificationsAlarmService.QNAME);
+ return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder()
+ .setNotificationsAlarmService(notificationAlarmServiceList).build()).buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<GetSupportedNotificationTypesOutput>>
+ getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) {
+ NotificationContext notificationContext = getNotificationContext();
+ if (notificationContext == null) {
+ return RpcResultBuilder.<GetSupportedNotificationTypesOutput>failed()
+ .withError(ErrorType.APPLICATION, "Couldnt get Notification Context from Datastore")
+ .buildFuture();
+ }
+ //TAPI 2.4 removes supported notification types from notif-subscription list and notification-context
+ //No way to store what notification types are supported
+ //Considers that by default all notification are supported
+ Set<NOTIFICATIONTYPE> notificationTypeList = new HashSet<>();
+ notificationTypeList.add(NOTIFICATIONTYPEOBJECTCREATION.VALUE);
+ notificationTypeList.add(NOTIFICATIONTYPEOBJECTDELETION.VALUE);
+ notificationTypeList.add(NOTIFICATIONTYPEATTRIBUTEVALUECHANGE.VALUE);
+//
+// if (notificationContext.getNotifSubscription() == null) {
+// return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder()
+// .setSupportedNotificationTypes(new HashSet<>())
+// .setSupportedObjectTypes(new HashSet<>()).build()).buildFuture();
+// }
+// Set<NOTIFICATIONTYPE> notificationTypeList = new HashSet<>();
+
+ //TAPI 2.4 removes supported object types from notif-subscription list and notification-context
+ //No way to store what object types are supported
+ //Considers that by default all object are supported
+ Set<OBJECTTYPE> objectTypeList = new HashSet<>();
+ objectTypeList.add(OBJECTTYPESERVICEINTERFACEPOINT.VALUE);
+ objectTypeList.add(OBJECTTYPETAPICONTEXT.VALUE);
+ objectTypeList.add(OBJECTTYPEPROFILE.VALUE);
+// for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) {
+// if (notifSubscription.getSupportedNotificationTypes() != null) {
+// notificationTypeList.addAll(notifSubscription.getSupportedNotificationTypes());
+// }
+// if (notifSubscription.getSupportedObjectTypes() != null) {
+// objectTypeList.addAll(notifSubscription.getSupportedObjectTypes());
+// }
+// }
+ return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder()
+ .setSupportedNotificationTypes(notificationTypeList)
+ .setSupportedObjectTypes(objectTypeList).build()).buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CreateNotificationSubscriptionServiceOutput>>
+ createNotificationSubscriptionService(CreateNotificationSubscriptionServiceInput input) {
+ for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) {
+ LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue());
+ this.topicManager.addTapiTopic(uuid.getValue());
+ }
+ SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder()
+ .setName(input.getSubscriptionFilter().getName())
+ .setLocalId(input.getSubscriptionFilter().getLocalId())
+ .setIncludeContent(input.getSubscriptionFilter().getIncludeContent())
+ .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes())
+ .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols())
+ .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier())
+ .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes())
+ .build();
+ Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString());
+ Map<SubscriptionFilterKey, SubscriptionFilter> sfmap = new HashMap<>();
+ sfmap.put(subscriptionFilter.key(), subscriptionFilter);
+ SubscriptionService subscriptionService = new SubscriptionServiceBuilder()
+ .setSubscriptionFilter(sfmap)
+ .setSubscriptionState(input.getSubscriptionState())
+ .setUuid(notifSubscriptionUuid)
+ .build();
+
+ NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid);
+ NotifSubscription notifSubscription = new NotifSubscriptionBuilder()
+ .setSubscriptionState(subscriptionService.getSubscriptionState())
+ .setSubscriptionFilter(subscriptionService.getSubscriptionFilter())
+ .setUuid(notifSubscriptionUuid)
+// Following 2 items are no more in notification-context with T-API 2.4
+// .setSupportedNotificationTypes(notificationTypes)
+// .setSupportedObjectTypes(objectTypes)
+ .setName(subscriptionService.getName())
+ .build();
+ NotificationContext notificationContext = getNotificationContext();
+ Map<NotifSubscriptionKey, NotifSubscription> notifSubscriptions = new HashMap<>();
+ if (notificationContext != null && notificationContext.getNotifSubscription() != null) {
+ notifSubscriptions.putAll(notificationContext.getNotifSubscription());
+ }
+ notifSubscriptions.put(notifSubscriptionKey, notifSubscription);
+ NotificationContext notificationContext1 = new NotificationContextBuilder()
+ .setNotification(notificationContext == null ? new HashMap<>() : notificationContext.getNotification())
+ .setNotifSubscription(notifSubscriptions)
+ .build();
+ if (!updateNotificationContext(notificationContext1)) {
+ LOG.error("Failed to update Notification context");
+ return RpcResultBuilder.<CreateNotificationSubscriptionServiceOutput>failed()
+ .withError(ErrorType.RPC, "Failed to update notification context").buildFuture();
+ }
+ CreateNotificationSubscriptionServiceOutput serviceOutput =
+ new CreateNotificationSubscriptionServiceOutputBuilder()
+ .setSubscriptionService(subscriptionService)
+ .build();
+ return RpcResultBuilder.success(serviceOutput).buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<UpdateNotificationSubscriptionServiceOutput>>
+ updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) {
+ // TODO --> Not yet implemented
+ return RpcResultBuilder.<UpdateNotificationSubscriptionServiceOutput>failed()
+ .withError(ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED, "RPC not implemented yet")
+ .buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<DeleteNotificationSubscriptionServiceOutput>>
+ deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) {
+ try {
+ if (input == null || input.getUuid() == null) {
+ LOG.warn("Missing mandatory params for input {}", input);
+ return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
+ .withError(ErrorType.RPC, "Missing input parameters").buildFuture();
+ }
+ Uuid notifSubsUuid = input.getUuid();
+ InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
+ .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
+ new NotifSubscriptionKey(notifSubsUuid)).build();
+ Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
+ LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
+
+ if (optionalNotifSub.isEmpty()) {
+ return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
+ .withError(ErrorType.APPLICATION,
+ "Notification subscription doesnt exist").buildFuture();
+ }
+ NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
+ this.networkTransactionService.delete(LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID);
+ this.networkTransactionService.commit().get();
+ for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry : notifSubscription
+ .getSubscriptionFilter().entrySet()) {
+ for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
+ this.topicManager.deleteTapiTopic(objectUuid.getValue());
+ }
+ }
+// for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
+// this.topicManager.deleteTapiTopic(objectUuid.getValue());
+// }
+ return RpcResultBuilder.success(new DeleteNotificationSubscriptionServiceOutputBuilder().build())
+ .buildFuture();
+ } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
+ LOG.error("Failed to delete Notification subscription service", e);
+ }
+ return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
+ .withError(ErrorType.APPLICATION,
+ "Failed to delete notification subscription service").buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceDetailsOutput>>
+ getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) {
+ if (input == null || input.getUuid() == null) {
+ LOG.warn("Missing mandatory params for input {}", input);
+ return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
+ .withError(ErrorType.RPC, "Missing input parameters").buildFuture();
+ }
+ Uuid notifSubsUuid = input.getUuid();
+ NotificationContext notificationContext = getNotificationContext();
+ if (notificationContext == null) {
+ return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
+ .withError(ErrorType.APPLICATION, "Notification context is empty")
+ .buildFuture();
+ }
+ if (notificationContext.getNotifSubscription() == null) {
+ return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder()
+ .setSubscriptionService(new org.opendaylight.yang.gen.v1
+ .urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service
+ .details.output.SubscriptionServiceBuilder().build()).build()).buildFuture();
+ }
+ if (!notificationContext.getNotifSubscription().containsKey(new NotifSubscriptionKey(notifSubsUuid))) {
+ return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
+ .withError(ErrorType.APPLICATION,
+ "Notification subscription service doesnt exist").buildFuture();
+ }
+ return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder()
+ .setSubscriptionService(new org.opendaylight.yang.gen.v1.urn
+ .onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service.details.output
+ .SubscriptionServiceBuilder(notificationContext.getNotifSubscription().get(
+ new NotifSubscriptionKey(notifSubsUuid))).build()).build()).buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceListOutput>>
+ getNotificationSubscriptionServiceList(GetNotificationSubscriptionServiceListInput input) {
+ NotificationContext notificationContext = getNotificationContext();
+ if (notificationContext == null) {
+ return RpcResultBuilder.<GetNotificationSubscriptionServiceListOutput>failed()
+ .withError(ErrorType.APPLICATION, "Notification context is empty")
+ .buildFuture();
+ }
+ if (notificationContext.getNotifSubscription() == null) {
+ return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder()
+ .setSubscriptionService(new HashMap<>()).build()).buildFuture();
+ }
+ Map<SubscriptionServiceKey, org.opendaylight.yang.gen.v1.urn.onf.otcc.yang
+ .tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionService>
+ notifSubsMap = new HashMap<>();
+ for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) {
+ org.opendaylight.yang.gen.v1.urn.onf.otcc.yang
+ .tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionService
+ subscriptionService = new org.opendaylight.yang.gen.v1
+ .urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service
+ .list.output.SubscriptionServiceBuilder(notifSubscription).build();
+ notifSubsMap.put(subscriptionService.key(), subscriptionService);
+ }
+ return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder()
+ .setSubscriptionService(notifSubsMap).build()).buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<GetNotificationListOutput>> getNotificationList(GetNotificationListInput input) {
+ try {
+ LOG.info("RPC getNotificationList received");
+ if (input == null || input.getSubscriptionId() == null) {
+ LOG.warn("Missing mandatory params for input {}", input);
+ return RpcResultBuilder.<GetNotificationListOutput>failed().withError(ErrorType.RPC,
+ "Missing input parameters").buildFuture();
+ }
+ Uuid notifSubsUuid = input.getSubscriptionId();
+ InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
+ .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
+ new NotifSubscriptionKey(notifSubsUuid)).build();
+ Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
+ LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
+
+ if (optionalNotifSub.isEmpty()) {
+ return RpcResultBuilder.<GetNotificationListOutput>failed()
+ .withError(ErrorType.APPLICATION,
+ "Notification subscription doesnt exist").buildFuture();
+ }
+ NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
+ List<Notification> notificationTapiList = new ArrayList<>();
+ for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry : notifSubscription
+ .getSubscriptionFilter().entrySet()) {
+ for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
+ if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
+ LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
+ continue;
+ }
+ LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
+ Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
+ objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
+ TapiNotificationDeserializer.class);
+ notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
+ }
+ }
+// for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
+// if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
+// LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
+// continue;
+// }
+// LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
+// Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
+// objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
+// TapiNotificationDeserializer.class);
+// notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
+// }
+ LOG.info("TAPI notifications = {}", notificationTapiList);
+ Map<NotificationKey, Notification> notificationMap = new HashMap<>();
+ for (Notification notif:notificationTapiList) {
+ notificationMap.put(notif.key(), notif);
+ }
+ return RpcResultBuilder.success(new GetNotificationListOutputBuilder()
+ .setNotification(notificationMap).build()).buildFuture();
+ } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
+ LOG.error("Failed to get Notifications from Kafka", e);
+ }
+ return RpcResultBuilder.<GetNotificationListOutput>failed()
+ .withError(ErrorType.APPLICATION,
+ "Notifications couldnt be retrieved from Kafka server").buildFuture();
+ }
+
+ public ImmutableClassToInstanceMap<Rpc<?, ?>> registerRPCs() {
+ return ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
+ .put(GetNotificationsProcessService.class, this::getNotificationsProcessService)
+ .put(GetNotificationsAlarmService.class, this::getNotificationsAlarmService)
+ .put(GetSupportedNotificationTypes.class, this::getSupportedNotificationTypes)
+ .put(CreateNotificationSubscriptionService.class, this::createNotificationSubscriptionService)
+ .put(UpdateNotificationSubscriptionService.class, this::updateNotificationSubscriptionService)
+ .put(DeleteNotificationSubscriptionService.class, this::deleteNotificationSubscriptionService)
+ .put(GetNotificationSubscriptionServiceDetails.class, this::getNotificationSubscriptionServiceDetails)
+ .put(GetNotificationSubscriptionServiceList.class, this::getNotificationSubscriptionServiceList)
+ .put(GetNotificationList.class, this::getNotificationList)
+ .build();
+ }
+
+ private NotificationContext getNotificationContext() {
+ LOG.info("Getting tapi notification context");
+ try {
+ InstanceIdentifier<NotificationContext> notificationcontextIID =
+ InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
+ .child(NotificationContext.class).build();
+ Optional<NotificationContext> notificationContextOptional
+ = this.networkTransactionService.read(LogicalDatastoreType.OPERATIONAL, notificationcontextIID).get();
+ if (!notificationContextOptional.isPresent()) {
+ LOG.error("Could not get TAPI notification context");
+ return null;
+ }
+ return notificationContextOptional.orElseThrow();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Could not get TAPI notification context");
+ }
+ return null;
+ }
+
+ private boolean updateNotificationContext(NotificationContext notificationContext1) {
+ try {
+ InstanceIdentifier<NotificationContext> notificationcontextIID =
+ InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
+ .child(NotificationContext.class).build();
+ this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, notificationcontextIID,
+ notificationContext1);
+ this.networkTransactionService.commit().get();
+ return true;
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Could not update TAPI notification context");
+ }
+ return false;
+ }
+