- @Override
- public ListenableFuture<RpcResult<GetNotificationsProcessServiceOutput>> getNotificationsProcessService(
- GetNotificationsProcessServiceInput input) {
- LOG.info("RPC getNotificationsService received");
- if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
- LOG.warn("Missing mandatory params for input {}", input);
- return RpcResultBuilder.success(new GetNotificationsProcessServiceOutputBuilder().build()).buildFuture();
- }
- Subscriber<NotificationProcessService, NotificationsProcessService> subscriber = new Subscriber<>(
- input.getIdConsumer(), input.getGroupId(), server, converterService,
- NotificationServiceDeserializer.class);
- List<NotificationsProcessService> notificationServiceList = subscriber
- .subscribe(input.getConnectionType().getName(), NotificationsProcessService.QNAME);
- return RpcResultBuilder.success(new GetNotificationsProcessServiceOutputBuilder()
- .setNotificationsProcessService(notificationServiceList).build()).buildFuture();
- }
-
- @Override
- public ListenableFuture<RpcResult<GetNotificationsAlarmServiceOutput>> getNotificationsAlarmService(
- GetNotificationsAlarmServiceInput input) {
- LOG.info("RPC getNotificationsAlarmService received");
- if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
- LOG.warn("Missing mandatory params for input {}", input);
- return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture();
- }
- Subscriber<NotificationAlarmService, NotificationsAlarmService> subscriber = new Subscriber<>(
- input.getIdConsumer(), input.getGroupId(), server, converterAlarmService,
- NotificationAlarmServiceDeserializer.class);
- List<NotificationsAlarmService> notificationAlarmServiceList = subscriber
- .subscribe("alarm" + input.getConnectionType().getName(), NotificationsAlarmService.QNAME);
- return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder()
- .setNotificationsAlarmService(notificationAlarmServiceList).build()).buildFuture();
- }
-
- @Override
- public ListenableFuture<RpcResult<GetSupportedNotificationTypesOutput>>
- getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) {
- NotificationContext notificationContext = getNotificationContext();
- if (notificationContext == null) {
- return RpcResultBuilder.<GetSupportedNotificationTypesOutput>failed()
- .withError(ErrorType.APPLICATION, "Couldnt get Notification Context from Datastore")
- .buildFuture();
- }
- //TAPI 2.4 removes supported notification types from notif-subscription list and notification-context
- //No way to store what notification types are supported
- //Considers that by default all notification are supported
- Set<NOTIFICATIONTYPE> notificationTypeList = new HashSet<>();
- notificationTypeList.add(NOTIFICATIONTYPEOBJECTCREATION.VALUE);
- notificationTypeList.add(NOTIFICATIONTYPEOBJECTDELETION.VALUE);
- notificationTypeList.add(NOTIFICATIONTYPEATTRIBUTEVALUECHANGE.VALUE);
-//
-// if (notificationContext.getNotifSubscription() == null) {
-// return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder()
-// .setSupportedNotificationTypes(new HashSet<>())
-// .setSupportedObjectTypes(new HashSet<>()).build()).buildFuture();
-// }
-// Set<NOTIFICATIONTYPE> notificationTypeList = new HashSet<>();
-
- //TAPI 2.4 removes supported object types from notif-subscription list and notification-context
- //No way to store what object types are supported
- //Considers that by default all object are supported
- Set<OBJECTTYPE> objectTypeList = new HashSet<>();
- objectTypeList.add(OBJECTTYPESERVICEINTERFACEPOINT.VALUE);
- objectTypeList.add(OBJECTTYPETAPICONTEXT.VALUE);
- objectTypeList.add(OBJECTTYPEPROFILE.VALUE);
- objectTypeList.add(TOPOLOGYOBJECTTYPENODE.VALUE);
- objectTypeList.add(TOPOLOGYOBJECTTYPELINK.VALUE);
- objectTypeList.add(TOPOLOGYOBJECTTYPENODEEDGEPOINT.VALUE);
- objectTypeList.add(TOPOLOGYOBJECTTYPENODERULEGROUP.VALUE);
- objectTypeList.add(TOPOLOGYOBJECTTYPEINTERRULEGROUP.VALUE);
- objectTypeList.add(CONNECTIVITYOBJECTTYPE.VALUE);
- objectTypeList.add(CONNECTIVITYOBJECTTYPECONNECTIVITYSERVICE.VALUE);
- objectTypeList.add(CONNECTIVITYOBJECTTYPECONNECTIONENDPOINT.VALUE);
- objectTypeList.add(CONNECTIVITYOBJECTTYPECONNECTION.VALUE);
-// for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) {
-// if (notifSubscription.getSupportedNotificationTypes() != null) {
-// notificationTypeList.addAll(notifSubscription.getSupportedNotificationTypes());
-// }
-// if (notifSubscription.getSupportedObjectTypes() != null) {
-// objectTypeList.addAll(notifSubscription.getSupportedObjectTypes());
-// }
-// }
- return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder()
- .setSupportedNotificationTypes(notificationTypeList)
- .setSupportedObjectTypes(objectTypeList).build()).buildFuture();
- }
-
- @Override
- public ListenableFuture<RpcResult<CreateNotificationSubscriptionServiceOutput>>
- createNotificationSubscriptionService(CreateNotificationSubscriptionServiceInput input) {
- for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) {
- LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue());
- this.topicManager.addTapiTopic(uuid.getValue());
- }
- SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder()
- .setName(input.getSubscriptionFilter().getName())
- .setLocalId(input.getSubscriptionFilter().getLocalId())
- .setIncludeContent(input.getSubscriptionFilter().getIncludeContent())
- .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes())
- .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols())
- .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier())
- .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes())
- .build();
- Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString());
- Map<SubscriptionFilterKey, SubscriptionFilter> sfmap = new HashMap<>();
- sfmap.put(subscriptionFilter.key(), subscriptionFilter);
- SubscriptionService subscriptionService = new SubscriptionServiceBuilder()
- .setSubscriptionFilter(sfmap)
- .setSubscriptionState(input.getSubscriptionState())
- .setUuid(notifSubscriptionUuid)
- .build();
-
- NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid);
- NotifSubscription notifSubscription = new NotifSubscriptionBuilder()
- .setSubscriptionState(subscriptionService.getSubscriptionState())
- .setSubscriptionFilter(subscriptionService.getSubscriptionFilter())
- .setUuid(notifSubscriptionUuid)
-// Following 2 items are no more in notification-context with T-API 2.4
-// .setSupportedNotificationTypes(notificationTypes)
-// .setSupportedObjectTypes(objectTypes)
- .setName(subscriptionService.getName())
- .build();
- NotificationContext notificationContext = getNotificationContext();
- Map<NotifSubscriptionKey, NotifSubscription> notifSubscriptions = new HashMap<>();
- if (notificationContext != null && notificationContext.getNotifSubscription() != null) {
- notifSubscriptions.putAll(notificationContext.getNotifSubscription());
- }
- notifSubscriptions.put(notifSubscriptionKey, notifSubscription);
- NotificationContext notificationContext1 = new NotificationContextBuilder()
- .setNotification(notificationContext == null ? new HashMap<>() : notificationContext.getNotification())
- .setNotifSubscription(notifSubscriptions)
- .build();
- if (!updateNotificationContext(notificationContext1)) {
- LOG.error("Failed to update Notification context");
- return RpcResultBuilder.<CreateNotificationSubscriptionServiceOutput>failed()
- .withError(ErrorType.RPC, "Failed to update notification context").buildFuture();
- }
- CreateNotificationSubscriptionServiceOutput serviceOutput =
- new CreateNotificationSubscriptionServiceOutputBuilder()
- .setSubscriptionService(subscriptionService)
- .build();
- return RpcResultBuilder.success(serviceOutput).buildFuture();
- }
-
- @Override
- public ListenableFuture<RpcResult<UpdateNotificationSubscriptionServiceOutput>>
- updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) {
- // TODO --> Not yet implemented
- return RpcResultBuilder.<UpdateNotificationSubscriptionServiceOutput>failed()
- .withError(ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED, "RPC not implemented yet")
- .buildFuture();
- }
-
- @Override
- public ListenableFuture<RpcResult<DeleteNotificationSubscriptionServiceOutput>>
- deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) {
- try {
- if (input == null || input.getUuid() == null) {
- LOG.warn("Missing mandatory params for input {}", input);
- return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
- .withError(ErrorType.RPC, "Missing input parameters").buildFuture();
- }
- Uuid notifSubsUuid = input.getUuid();
- InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
- .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
- new NotifSubscriptionKey(notifSubsUuid)).build();
- Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
- LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
-
- if (optionalNotifSub.isEmpty()) {
- return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
- .withError(ErrorType.APPLICATION,
- "Notification subscription doesnt exist").buildFuture();
- }
- NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
- this.networkTransactionService.delete(LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID);
- this.networkTransactionService.commit().get();
- for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry : notifSubscription
- .getSubscriptionFilter().entrySet()) {
- for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
- this.topicManager.deleteTapiTopic(objectUuid.getValue());
- }
- }
-// for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
-// this.topicManager.deleteTapiTopic(objectUuid.getValue());
-// }
- return RpcResultBuilder.success(new DeleteNotificationSubscriptionServiceOutputBuilder().build())
- .buildFuture();
- } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
- LOG.error("Failed to delete Notification subscription service", e);
- }
- return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
- .withError(ErrorType.APPLICATION,
- "Failed to delete notification subscription service").buildFuture();
- }
-
- @Override
- public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceDetailsOutput>>
- getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) {
- if (input == null || input.getUuid() == null) {
- LOG.warn("Missing mandatory params for input {}", input);
- return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
- .withError(ErrorType.RPC, "Missing input parameters").buildFuture();
- }
- Uuid notifSubsUuid = input.getUuid();
- NotificationContext notificationContext = getNotificationContext();
- if (notificationContext == null) {
- return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
- .withError(ErrorType.APPLICATION, "Notification context is empty")
- .buildFuture();
- }
- if (notificationContext.getNotifSubscription() == null) {
- return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder()
- .setSubscriptionService(new org.opendaylight.yang.gen.v1
- .urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service
- .details.output.SubscriptionServiceBuilder().build()).build()).buildFuture();
- }
- if (!notificationContext.getNotifSubscription().containsKey(new NotifSubscriptionKey(notifSubsUuid))) {
- return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
- .withError(ErrorType.APPLICATION,
- "Notification subscription service doesnt exist").buildFuture();
- }
- return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder()
- .setSubscriptionService(new org.opendaylight.yang.gen.v1.urn
- .onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service.details.output
- .SubscriptionServiceBuilder(notificationContext.getNotifSubscription().get(
- new NotifSubscriptionKey(notifSubsUuid))).build()).build()).buildFuture();
- }
-
- @Override
- public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceListOutput>>
- getNotificationSubscriptionServiceList(GetNotificationSubscriptionServiceListInput input) {
- NotificationContext notificationContext = getNotificationContext();
- if (notificationContext == null) {
- return RpcResultBuilder.<GetNotificationSubscriptionServiceListOutput>failed()
- .withError(ErrorType.APPLICATION, "Notification context is empty")
- .buildFuture();
- }
- if (notificationContext.getNotifSubscription() == null) {
- return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder()
- .setSubscriptionService(new HashMap<>()).build()).buildFuture();
- }
- Map<SubscriptionServiceKey, org.opendaylight.yang.gen.v1.urn.onf.otcc.yang
- .tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionService>
- notifSubsMap = new HashMap<>();
- for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) {
- org.opendaylight.yang.gen.v1.urn.onf.otcc.yang
- .tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionService
- subscriptionService = new org.opendaylight.yang.gen.v1
- .urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service
- .list.output.SubscriptionServiceBuilder(notifSubscription).build();
- notifSubsMap.put(subscriptionService.key(), subscriptionService);
- }
- return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder()
- .setSubscriptionService(notifSubsMap).build()).buildFuture();
- }
-
- @Override
- public ListenableFuture<RpcResult<GetNotificationListOutput>> getNotificationList(GetNotificationListInput input) {
- try {
- LOG.info("RPC getNotificationList received");
- if (input == null || input.getSubscriptionId() == null) {
- LOG.warn("Missing mandatory params for input {}", input);
- return RpcResultBuilder.<GetNotificationListOutput>failed().withError(ErrorType.RPC,
- "Missing input parameters").buildFuture();
- }
- Uuid notifSubsUuid = input.getSubscriptionId();
- InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
- .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
- new NotifSubscriptionKey(notifSubsUuid)).build();
- Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
- LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
-
- if (optionalNotifSub.isEmpty()) {
- return RpcResultBuilder.<GetNotificationListOutput>failed()
- .withError(ErrorType.APPLICATION,
- "Notification subscription doesnt exist").buildFuture();
- }
- NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
- List<Notification> notificationTapiList = new ArrayList<>();
- for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry : notifSubscription
- .getSubscriptionFilter().entrySet()) {
- for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
- if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
- LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
- continue;
- }
- LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
- Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
- objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
- TapiNotificationDeserializer.class);
- notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
- }
- }
-// for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
-// if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
-// LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
-// continue;
-// }
-// LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
-// Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
-// objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
-// TapiNotificationDeserializer.class);
-// notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
-// }
- LOG.info("TAPI notifications = {}", notificationTapiList);
- Map<NotificationKey, Notification> notificationMap = new HashMap<>();
- for (Notification notif:notificationTapiList) {
- notificationMap.put(notif.key(), notif);
- }
- return RpcResultBuilder.success(new GetNotificationListOutputBuilder()
- .setNotification(notificationMap).build()).buildFuture();
- } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
- LOG.error("Failed to get Notifications from Kafka", e);
- }
- return RpcResultBuilder.<GetNotificationListOutput>failed()
- .withError(ErrorType.APPLICATION,
- "Notifications couldnt be retrieved from Kafka server").buildFuture();
- }
-