2 * Copyright © 2024 Orange, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.transportpce.nbinotifications.impl.rpc;
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Optional;
17 import java.util.concurrent.ExecutionException;
18 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
19 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
20 import org.opendaylight.transportpce.common.network.NetworkTransactionService;
21 import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
22 import org.opendaylight.transportpce.nbinotifications.serialization.TapiNotificationDeserializer;
23 import org.opendaylight.transportpce.nbinotifications.utils.TopicManager;
24 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationTapiService;
25 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Context;
26 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Uuid;
27 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.Context1;
28 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationList;
29 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListInput;
30 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutput;
31 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.context.NotificationContext;
33 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.Notification;
34 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.NotificationKey;
35 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscription;
36 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscriptionKey;
37 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilter;
38 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilterKey;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.opendaylight.yangtools.yang.common.ErrorType;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 public class GetNotificationListImpl implements GetNotificationList {
48 private static final Logger LOG = LoggerFactory.getLogger(GetNotificationListImpl.class);
50 private final JsonStringConverter<NotificationTapiService> converterTapiService;
51 private final String server;
52 private final NetworkTransactionService networkTransactionService;
53 private final TopicManager topicManager;
55 public GetNotificationListImpl(JsonStringConverter<NotificationTapiService> converterTapiService, String server,
56 NetworkTransactionService networkTransactionService, TopicManager topicManager) {
57 this.converterTapiService = converterTapiService;
59 this.networkTransactionService = networkTransactionService;
60 this.topicManager = topicManager;
65 public ListenableFuture<RpcResult<GetNotificationListOutput>> invoke(GetNotificationListInput input) {
67 LOG.info("RPC getNotificationList received");
68 if (input == null || input.getSubscriptionId() == null) {
69 LOG.warn("Missing mandatory params for input {}", input);
70 return RpcResultBuilder.<GetNotificationListOutput>failed().withError(ErrorType.RPC,
71 "Missing input parameters").buildFuture();
73 Uuid notifSubsUuid = input.getSubscriptionId();
74 InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
75 .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
76 new NotifSubscriptionKey(notifSubsUuid)).build();
77 Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
78 LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
80 if (optionalNotifSub.isEmpty()) {
81 return RpcResultBuilder.<GetNotificationListOutput>failed()
82 .withError(ErrorType.APPLICATION,
83 "Notification subscription doesnt exist").buildFuture();
85 NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
86 List<Notification> notificationTapiList = new ArrayList<>();
87 for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry : notifSubscription
88 .getSubscriptionFilter().entrySet()) {
89 for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
90 if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
91 LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
94 LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
95 Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
96 objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
97 TapiNotificationDeserializer.class);
98 notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
101 // for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
102 // if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
103 // LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
106 // LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
107 // Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
108 // objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
109 // TapiNotificationDeserializer.class);
110 // notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
112 LOG.info("TAPI notifications = {}", notificationTapiList);
113 Map<NotificationKey, Notification> notificationMap = new HashMap<>();
114 for (Notification notif:notificationTapiList) {
115 notificationMap.put(notif.key(), notif);
117 return RpcResultBuilder.success(new GetNotificationListOutputBuilder()
118 .setNotification(notificationMap).build()).buildFuture();
119 } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
120 LOG.error("Failed to get Notifications from Kafka", e);
122 return RpcResultBuilder.<GetNotificationListOutput>failed()
123 .withError(ErrorType.APPLICATION,
124 "Notifications couldnt be retrieved from Kafka server").buildFuture();