Refactor nbinotifications
[transportpce.git] / nbinotifications / src / main / java / org / opendaylight / transportpce / nbinotifications / impl / rpc / GetNotificationListImpl.java
1 /*
2  * Copyright © 2024 Orange, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.transportpce.nbinotifications.impl.rpc;
9
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.NoSuchElementException;
16 import java.util.Optional;
17 import java.util.concurrent.ExecutionException;
18 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
19 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
20 import org.opendaylight.transportpce.common.network.NetworkTransactionService;
21 import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
22 import org.opendaylight.transportpce.nbinotifications.serialization.TapiNotificationDeserializer;
23 import org.opendaylight.transportpce.nbinotifications.utils.TopicManager;
24 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationTapiService;
25 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Context;
26 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Uuid;
27 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.Context1;
28 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationList;
29 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListInput;
30 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutput;
31 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.context.NotificationContext;
33 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.Notification;
34 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.NotificationKey;
35 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscription;
36 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscriptionKey;
37 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilter;
38 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilterKey;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.opendaylight.yangtools.yang.common.ErrorType;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46
47 public class GetNotificationListImpl implements GetNotificationList {
48     private static final Logger LOG = LoggerFactory.getLogger(GetNotificationListImpl.class);
49
50     private final JsonStringConverter<NotificationTapiService> converterTapiService;
51     private final String server;
52     private final NetworkTransactionService networkTransactionService;
53     private final TopicManager topicManager;
54
55     public GetNotificationListImpl(JsonStringConverter<NotificationTapiService> converterTapiService, String server,
56             NetworkTransactionService networkTransactionService, TopicManager topicManager) {
57         this.converterTapiService = converterTapiService;
58         this.server = server;
59         this.networkTransactionService = networkTransactionService;
60         this.topicManager = topicManager;
61     }
62
63
64     @Override
65     public ListenableFuture<RpcResult<GetNotificationListOutput>> invoke(GetNotificationListInput input) {
66         try {
67             LOG.info("RPC getNotificationList received");
68             if (input == null || input.getSubscriptionId() == null) {
69                 LOG.warn("Missing mandatory params for input {}", input);
70                 return RpcResultBuilder.<GetNotificationListOutput>failed()
71                     .withError(ErrorType.RPC, "Missing input parameters")
72                     .buildFuture();
73             }
74             Uuid notifSubsUuid = input.getSubscriptionId();
75             Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService
76                 .read(
77                     LogicalDatastoreType.OPERATIONAL,
78                     InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
79                         .child(NotificationContext.class)
80                         .child(NotifSubscription.class, new NotifSubscriptionKey(notifSubsUuid))
81                         .build())
82                 .get();
83             if (optionalNotifSub.isEmpty()) {
84                 return RpcResultBuilder.<GetNotificationListOutput>failed()
85                     .withError(ErrorType.APPLICATION, "Notification subscription doesnt exist")
86                     .buildFuture();
87             }
88             NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
89             List<Notification> notificationTapiList = new ArrayList<>();
90             for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry :
91                     notifSubscription.getSubscriptionFilter().entrySet()) {
92                 for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
93                     if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
94                         LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
95                         continue;
96                     }
97                     LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
98                     Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
99                         objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
100                         TapiNotificationDeserializer.class);
101                     notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
102                 }
103             }
104             LOG.info("TAPI notifications = {}", notificationTapiList);
105             Map<NotificationKey, Notification> notificationMap = new HashMap<>();
106             for (Notification notif:notificationTapiList) {
107                 notificationMap.put(notif.key(), notif);
108             }
109             return RpcResultBuilder
110                 .success(new GetNotificationListOutputBuilder().setNotification(notificationMap).build())
111                 .buildFuture();
112         } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
113             LOG.error("Failed to get Notifications from Kafka", e);
114         }
115         return RpcResultBuilder.<GetNotificationListOutput>failed()
116             .withError(ErrorType.APPLICATION, "Notifications couldnt be retrieved from Kafka server")
117             .buildFuture();
118     }
119
120 }