2 * Copyright © 2024 Orange, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.transportpce.nbinotifications.impl.rpc;
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Optional;
17 import java.util.concurrent.ExecutionException;
18 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
19 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
20 import org.opendaylight.transportpce.common.network.NetworkTransactionService;
21 import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
22 import org.opendaylight.transportpce.nbinotifications.serialization.TapiNotificationDeserializer;
23 import org.opendaylight.transportpce.nbinotifications.utils.TopicManager;
24 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationTapiService;
25 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Context;
26 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Uuid;
27 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.Context1;
28 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationList;
29 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListInput;
30 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutput;
31 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.context.NotificationContext;
33 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.Notification;
34 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.NotificationKey;
35 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscription;
36 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscriptionKey;
37 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilter;
38 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilterKey;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.opendaylight.yangtools.yang.common.ErrorType;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 public class GetNotificationListImpl implements GetNotificationList {
48 private static final Logger LOG = LoggerFactory.getLogger(GetNotificationListImpl.class);
50 private final JsonStringConverter<NotificationTapiService> converterTapiService;
51 private final String server;
52 private final NetworkTransactionService networkTransactionService;
53 private final TopicManager topicManager;
55 public GetNotificationListImpl(JsonStringConverter<NotificationTapiService> converterTapiService, String server,
56 NetworkTransactionService networkTransactionService, TopicManager topicManager) {
57 this.converterTapiService = converterTapiService;
59 this.networkTransactionService = networkTransactionService;
60 this.topicManager = topicManager;
65 public ListenableFuture<RpcResult<GetNotificationListOutput>> invoke(GetNotificationListInput input) {
67 LOG.info("RPC getNotificationList received");
68 if (input == null || input.getSubscriptionId() == null) {
69 LOG.warn("Missing mandatory params for input {}", input);
70 return RpcResultBuilder.<GetNotificationListOutput>failed()
71 .withError(ErrorType.RPC, "Missing input parameters")
74 Uuid notifSubsUuid = input.getSubscriptionId();
75 Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService
77 LogicalDatastoreType.OPERATIONAL,
78 InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
79 .child(NotificationContext.class)
80 .child(NotifSubscription.class, new NotifSubscriptionKey(notifSubsUuid))
83 if (optionalNotifSub.isEmpty()) {
84 return RpcResultBuilder.<GetNotificationListOutput>failed()
85 .withError(ErrorType.APPLICATION, "Notification subscription doesnt exist")
88 NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
89 List<Notification> notificationTapiList = new ArrayList<>();
90 for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry :
91 notifSubscription.getSubscriptionFilter().entrySet()) {
92 for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
93 if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
94 LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
97 LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
98 Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
99 objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
100 TapiNotificationDeserializer.class);
101 notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
104 LOG.info("TAPI notifications = {}", notificationTapiList);
105 Map<NotificationKey, Notification> notificationMap = new HashMap<>();
106 for (Notification notif:notificationTapiList) {
107 notificationMap.put(notif.key(), notif);
109 return RpcResultBuilder
110 .success(new GetNotificationListOutputBuilder().setNotification(notificationMap).build())
112 } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
113 LOG.error("Failed to get Notifications from Kafka", e);
115 return RpcResultBuilder.<GetNotificationListOutput>failed()
116 .withError(ErrorType.APPLICATION, "Notifications couldnt be retrieved from Kafka server")