91e71eca11a34a8af144c342944cfececb10cac4
[transportpce.git] / nbinotifications / src / main / java / org / opendaylight / transportpce / nbinotifications / impl / NbiNotificationsImpl.java
1 /*
2  * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.transportpce.nbinotifications.impl;
9
10 import com.google.common.collect.ImmutableClassToInstanceMap;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.NoSuchElementException;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.UUID;
21 import java.util.concurrent.ExecutionException;
22 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
23 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
24 import org.opendaylight.transportpce.common.network.NetworkTransactionService;
25 import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
26 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
27 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
28 import org.opendaylight.transportpce.nbinotifications.serialization.TapiNotificationDeserializer;
29 import org.opendaylight.transportpce.nbinotifications.utils.TopicManager;
30 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsAlarmService;
31 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsAlarmServiceInput;
32 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsAlarmServiceOutput;
33 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsAlarmServiceOutputBuilder;
34 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsProcessService;
35 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsProcessServiceInput;
36 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsProcessServiceOutput;
37 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.GetNotificationsProcessServiceOutputBuilder;
38 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NbiNotificationsService;
39 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationAlarmService;
40 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationProcessService;
41 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.NotificationTapiService;
42 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.get.notifications.alarm.service.output.NotificationsAlarmService;
43 import org.opendaylight.yang.gen.v1.nbi.notifications.rev230728.get.notifications.process.service.output.NotificationsProcessService;
44 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Context;
45 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.OBJECTTYPE;
46 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.OBJECTTYPEPROFILE;
47 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.OBJECTTYPESERVICEINTERFACEPOINT;
48 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.OBJECTTYPETAPICONTEXT;
49 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev221121.Uuid;
50 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.Context1;
51 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.CreateNotificationSubscriptionService;
52 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.CreateNotificationSubscriptionServiceInput;
53 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.CreateNotificationSubscriptionServiceOutput;
54 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.CreateNotificationSubscriptionServiceOutputBuilder;
55 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.DeleteNotificationSubscriptionService;
56 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.DeleteNotificationSubscriptionServiceInput;
57 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.DeleteNotificationSubscriptionServiceOutput;
58 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.DeleteNotificationSubscriptionServiceOutputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationList;
60 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListInput;
61 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutput;
62 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationListOutputBuilder;
63 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceDetails;
64 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceDetailsInput;
65 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceDetailsOutput;
66 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceDetailsOutputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceList;
68 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceListInput;
69 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceListOutput;
70 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetNotificationSubscriptionServiceListOutputBuilder;
71 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetSupportedNotificationTypes;
72 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetSupportedNotificationTypesInput;
73 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetSupportedNotificationTypesOutput;
74 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.GetSupportedNotificationTypesOutputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.NOTIFICATIONTYPE;
76 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.NOTIFICATIONTYPEATTRIBUTEVALUECHANGE;
77 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.NOTIFICATIONTYPEOBJECTCREATION;
78 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.NOTIFICATIONTYPEOBJECTDELETION;
79 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.TapiNotificationService;
80 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.UpdateNotificationSubscriptionService;
81 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.UpdateNotificationSubscriptionServiceInput;
82 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.UpdateNotificationSubscriptionServiceOutput;
83 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.context.NotificationContext;
84 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.context.NotificationContextBuilder;
85 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.create.notification.subscription.service.output.SubscriptionService;
86 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.create.notification.subscription.service.output.SubscriptionServiceBuilder;
87 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.Notification;
88 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.list.output.NotificationKey;
89 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionServiceKey;
90 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscription;
91 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscriptionBuilder;
92 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.context.NotifSubscriptionKey;
93 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilter;
94 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilterBuilder;
95 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev221121.notification.subscription.service.SubscriptionFilterKey;
96 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
97 import org.opendaylight.yangtools.yang.binding.Rpc;
98 import org.opendaylight.yangtools.yang.common.ErrorTag;
99 import org.opendaylight.yangtools.yang.common.ErrorType;
100 import org.opendaylight.yangtools.yang.common.RpcResult;
101 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
102 import org.slf4j.Logger;
103 import org.slf4j.LoggerFactory;
104
105 public class NbiNotificationsImpl implements NbiNotificationsService, TapiNotificationService {
106     private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class);
107     private final JsonStringConverter<NotificationProcessService> converterService;
108     private final JsonStringConverter<NotificationAlarmService> converterAlarmService;
109     private final JsonStringConverter<NotificationTapiService> converterTapiService;
110     private final String server;
111     private final NetworkTransactionService networkTransactionService;
112     private final TopicManager topicManager;
113
114     public NbiNotificationsImpl(JsonStringConverter<NotificationProcessService> converterService,
115                                 JsonStringConverter<NotificationAlarmService> converterAlarmService,
116                                 JsonStringConverter<NotificationTapiService> converterTapiService, String server,
117                                 NetworkTransactionService networkTransactionService, TopicManager topicManager) {
118         this.converterService = converterService;
119         this.converterAlarmService = converterAlarmService;
120         this.converterTapiService = converterTapiService;
121         this.server = server;
122         this.networkTransactionService = networkTransactionService;
123         this.topicManager = topicManager;
124     }
125
126     @Override
127     public ListenableFuture<RpcResult<GetNotificationsProcessServiceOutput>> getNotificationsProcessService(
128             GetNotificationsProcessServiceInput input) {
129         LOG.info("RPC getNotificationsService received");
130         if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
131             LOG.warn("Missing mandatory params for input {}", input);
132             return RpcResultBuilder.success(new GetNotificationsProcessServiceOutputBuilder().build()).buildFuture();
133         }
134         Subscriber<NotificationProcessService, NotificationsProcessService> subscriber = new Subscriber<>(
135                 input.getIdConsumer(), input.getGroupId(), server, converterService,
136                 NotificationServiceDeserializer.class);
137         List<NotificationsProcessService> notificationServiceList = subscriber
138                 .subscribe(input.getConnectionType().getName(), NotificationsProcessService.QNAME);
139         return RpcResultBuilder.success(new GetNotificationsProcessServiceOutputBuilder()
140                 .setNotificationsProcessService(notificationServiceList).build()).buildFuture();
141     }
142
143     @Override
144     public ListenableFuture<RpcResult<GetNotificationsAlarmServiceOutput>> getNotificationsAlarmService(
145             GetNotificationsAlarmServiceInput input) {
146         LOG.info("RPC getNotificationsAlarmService received");
147         if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
148             LOG.warn("Missing mandatory params for input {}", input);
149             return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture();
150         }
151         Subscriber<NotificationAlarmService, NotificationsAlarmService> subscriber = new Subscriber<>(
152                 input.getIdConsumer(), input.getGroupId(), server, converterAlarmService,
153                 NotificationAlarmServiceDeserializer.class);
154         List<NotificationsAlarmService> notificationAlarmServiceList = subscriber
155                 .subscribe("alarm" + input.getConnectionType().getName(), NotificationsAlarmService.QNAME);
156         return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder()
157                 .setNotificationsAlarmService(notificationAlarmServiceList).build()).buildFuture();
158     }
159
160     @Override
161     public ListenableFuture<RpcResult<GetSupportedNotificationTypesOutput>>
162             getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) {
163         NotificationContext notificationContext = getNotificationContext();
164         if (notificationContext == null) {
165             return RpcResultBuilder.<GetSupportedNotificationTypesOutput>failed()
166                 .withError(ErrorType.APPLICATION, "Couldnt get Notification Context from Datastore")
167                 .buildFuture();
168         }
169         //TAPI 2.4 removes supported notification types from notif-subscription list and notification-context
170         //No way to store what notification types are supported
171         //Considers that by default all notification are supported
172         Set<NOTIFICATIONTYPE> notificationTypeList = new HashSet<>();
173         notificationTypeList.add(NOTIFICATIONTYPEOBJECTCREATION.VALUE);
174         notificationTypeList.add(NOTIFICATIONTYPEOBJECTDELETION.VALUE);
175         notificationTypeList.add(NOTIFICATIONTYPEATTRIBUTEVALUECHANGE.VALUE);
176 //
177 //        if (notificationContext.getNotifSubscription() == null) {
178 //            return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder()
179 //                .setSupportedNotificationTypes(new HashSet<>())
180 //                .setSupportedObjectTypes(new HashSet<>()).build()).buildFuture();
181 //        }
182 //        Set<NOTIFICATIONTYPE> notificationTypeList = new HashSet<>();
183
184         //TAPI 2.4 removes supported object types from notif-subscription list and notification-context
185         //No way to store what object types are supported
186         //Considers that by default all object are supported
187         Set<OBJECTTYPE> objectTypeList = new HashSet<>();
188         objectTypeList.add(OBJECTTYPESERVICEINTERFACEPOINT.VALUE);
189         objectTypeList.add(OBJECTTYPETAPICONTEXT.VALUE);
190         objectTypeList.add(OBJECTTYPEPROFILE.VALUE);
191 //        for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) {
192 //            if (notifSubscription.getSupportedNotificationTypes() != null) {
193 //                notificationTypeList.addAll(notifSubscription.getSupportedNotificationTypes());
194 //            }
195 //            if (notifSubscription.getSupportedObjectTypes() != null) {
196 //                objectTypeList.addAll(notifSubscription.getSupportedObjectTypes());
197 //            }
198 //        }
199         return RpcResultBuilder.success(new GetSupportedNotificationTypesOutputBuilder()
200             .setSupportedNotificationTypes(notificationTypeList)
201             .setSupportedObjectTypes(objectTypeList).build()).buildFuture();
202     }
203
204     @Override
205     public ListenableFuture<RpcResult<CreateNotificationSubscriptionServiceOutput>>
206             createNotificationSubscriptionService(CreateNotificationSubscriptionServiceInput input) {
207         for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) {
208             LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue());
209             this.topicManager.addTapiTopic(uuid.getValue());
210         }
211         SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder()
212             .setName(input.getSubscriptionFilter().getName())
213             .setLocalId(input.getSubscriptionFilter().getLocalId())
214             .setIncludeContent(input.getSubscriptionFilter().getIncludeContent())
215             .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes())
216             .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols())
217             .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier())
218             .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes())
219             .build();
220         Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString());
221         Map<SubscriptionFilterKey, SubscriptionFilter> sfmap = new HashMap<>();
222         sfmap.put(subscriptionFilter.key(), subscriptionFilter);
223         SubscriptionService subscriptionService = new SubscriptionServiceBuilder()
224             .setSubscriptionFilter(sfmap)
225             .setSubscriptionState(input.getSubscriptionState())
226             .setUuid(notifSubscriptionUuid)
227             .build();
228
229         NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid);
230         NotifSubscription notifSubscription = new NotifSubscriptionBuilder()
231             .setSubscriptionState(subscriptionService.getSubscriptionState())
232             .setSubscriptionFilter(subscriptionService.getSubscriptionFilter())
233             .setUuid(notifSubscriptionUuid)
234 //            Following 2 items are no more in notification-context with T-API 2.4
235 //            .setSupportedNotificationTypes(notificationTypes)
236 //            .setSupportedObjectTypes(objectTypes)
237             .setName(subscriptionService.getName())
238             .build();
239         NotificationContext notificationContext = getNotificationContext();
240         Map<NotifSubscriptionKey, NotifSubscription> notifSubscriptions = new HashMap<>();
241         if (notificationContext != null && notificationContext.getNotifSubscription() != null) {
242             notifSubscriptions.putAll(notificationContext.getNotifSubscription());
243         }
244         notifSubscriptions.put(notifSubscriptionKey, notifSubscription);
245         NotificationContext notificationContext1 = new NotificationContextBuilder()
246             .setNotification(notificationContext == null ? new HashMap<>() : notificationContext.getNotification())
247             .setNotifSubscription(notifSubscriptions)
248             .build();
249         if (!updateNotificationContext(notificationContext1)) {
250             LOG.error("Failed to update Notification context");
251             return RpcResultBuilder.<CreateNotificationSubscriptionServiceOutput>failed()
252                 .withError(ErrorType.RPC, "Failed to update notification context").buildFuture();
253         }
254         CreateNotificationSubscriptionServiceOutput serviceOutput =
255             new CreateNotificationSubscriptionServiceOutputBuilder()
256                 .setSubscriptionService(subscriptionService)
257                 .build();
258         return RpcResultBuilder.success(serviceOutput).buildFuture();
259     }
260
261     @Override
262     public ListenableFuture<RpcResult<UpdateNotificationSubscriptionServiceOutput>>
263             updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) {
264         // TODO --> Not yet implemented
265         return RpcResultBuilder.<UpdateNotificationSubscriptionServiceOutput>failed()
266             .withError(ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED, "RPC not implemented yet")
267             .buildFuture();
268     }
269
270     @Override
271     public ListenableFuture<RpcResult<DeleteNotificationSubscriptionServiceOutput>>
272             deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) {
273         try {
274             if (input == null || input.getUuid() == null) {
275                 LOG.warn("Missing mandatory params for input {}", input);
276                 return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
277                     .withError(ErrorType.RPC, "Missing input parameters").buildFuture();
278             }
279             Uuid notifSubsUuid = input.getUuid();
280             InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
281                 .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
282                     new NotifSubscriptionKey(notifSubsUuid)).build();
283             Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
284                 LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
285
286             if (optionalNotifSub.isEmpty()) {
287                 return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
288                     .withError(ErrorType.APPLICATION,
289                         "Notification subscription doesnt exist").buildFuture();
290             }
291             NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
292             this.networkTransactionService.delete(LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID);
293             this.networkTransactionService.commit().get();
294             for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry : notifSubscription
295                     .getSubscriptionFilter().entrySet()) {
296                 for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
297                     this.topicManager.deleteTapiTopic(objectUuid.getValue());
298                 }
299             }
300 //            for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
301 //                this.topicManager.deleteTapiTopic(objectUuid.getValue());
302 //            }
303             return RpcResultBuilder.success(new DeleteNotificationSubscriptionServiceOutputBuilder().build())
304                 .buildFuture();
305         } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
306             LOG.error("Failed to delete Notification subscription service", e);
307         }
308         return RpcResultBuilder.<DeleteNotificationSubscriptionServiceOutput>failed()
309             .withError(ErrorType.APPLICATION,
310                 "Failed to delete notification subscription service").buildFuture();
311     }
312
313     @Override
314     public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceDetailsOutput>>
315             getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) {
316         if (input == null || input.getUuid() == null) {
317             LOG.warn("Missing mandatory params for input {}", input);
318             return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
319                 .withError(ErrorType.RPC, "Missing input parameters").buildFuture();
320         }
321         Uuid notifSubsUuid = input.getUuid();
322         NotificationContext notificationContext = getNotificationContext();
323         if (notificationContext == null) {
324             return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
325                 .withError(ErrorType.APPLICATION, "Notification context is empty")
326                 .buildFuture();
327         }
328         if (notificationContext.getNotifSubscription() == null) {
329             return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder()
330                 .setSubscriptionService(new org.opendaylight.yang.gen.v1
331                     .urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service
332                         .details.output.SubscriptionServiceBuilder().build()).build()).buildFuture();
333         }
334         if (!notificationContext.getNotifSubscription().containsKey(new NotifSubscriptionKey(notifSubsUuid))) {
335             return RpcResultBuilder.<GetNotificationSubscriptionServiceDetailsOutput>failed()
336                 .withError(ErrorType.APPLICATION,
337                     "Notification subscription service doesnt exist").buildFuture();
338         }
339         return RpcResultBuilder.success(new GetNotificationSubscriptionServiceDetailsOutputBuilder()
340             .setSubscriptionService(new org.opendaylight.yang.gen.v1.urn
341                 .onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service.details.output
342                 .SubscriptionServiceBuilder(notificationContext.getNotifSubscription().get(
343                     new NotifSubscriptionKey(notifSubsUuid))).build()).build()).buildFuture();
344     }
345
346     @Override
347     public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceListOutput>>
348             getNotificationSubscriptionServiceList(GetNotificationSubscriptionServiceListInput input) {
349         NotificationContext notificationContext = getNotificationContext();
350         if (notificationContext == null) {
351             return RpcResultBuilder.<GetNotificationSubscriptionServiceListOutput>failed()
352                 .withError(ErrorType.APPLICATION, "Notification context is empty")
353                 .buildFuture();
354         }
355         if (notificationContext.getNotifSubscription() == null) {
356             return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder()
357                 .setSubscriptionService(new HashMap<>()).build()).buildFuture();
358         }
359         Map<SubscriptionServiceKey, org.opendaylight.yang.gen.v1.urn.onf.otcc.yang
360             .tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionService>
361                 notifSubsMap = new HashMap<>();
362         for (NotifSubscription notifSubscription:notificationContext.getNotifSubscription().values()) {
363             org.opendaylight.yang.gen.v1.urn.onf.otcc.yang
364                 .tapi.notification.rev221121.get.notification.subscription.service.list.output.SubscriptionService
365                     subscriptionService = new org.opendaylight.yang.gen.v1
366                         .urn.onf.otcc.yang.tapi.notification.rev221121.get.notification.subscription.service
367                             .list.output.SubscriptionServiceBuilder(notifSubscription).build();
368             notifSubsMap.put(subscriptionService.key(), subscriptionService);
369         }
370         return RpcResultBuilder.success(new GetNotificationSubscriptionServiceListOutputBuilder()
371             .setSubscriptionService(notifSubsMap).build()).buildFuture();
372     }
373
374     @Override
375     public ListenableFuture<RpcResult<GetNotificationListOutput>> getNotificationList(GetNotificationListInput input) {
376         try {
377             LOG.info("RPC getNotificationList received");
378             if (input == null || input.getSubscriptionId() == null) {
379                 LOG.warn("Missing mandatory params for input {}", input);
380                 return RpcResultBuilder.<GetNotificationListOutput>failed().withError(ErrorType.RPC,
381                     "Missing input parameters").buildFuture();
382             }
383             Uuid notifSubsUuid = input.getSubscriptionId();
384             InstanceIdentifier<NotifSubscription> notifSubscriptionIID = InstanceIdentifier.builder(Context.class)
385                 .augmentation(Context1.class).child(NotificationContext.class).child(NotifSubscription.class,
386                     new NotifSubscriptionKey(notifSubsUuid)).build();
387             Optional<NotifSubscription> optionalNotifSub = this.networkTransactionService.read(
388                 LogicalDatastoreType.OPERATIONAL, notifSubscriptionIID).get();
389
390             if (optionalNotifSub.isEmpty()) {
391                 return RpcResultBuilder.<GetNotificationListOutput>failed()
392                     .withError(ErrorType.APPLICATION,
393                         "Notification subscription doesnt exist").buildFuture();
394             }
395             NotifSubscription notifSubscription = optionalNotifSub.orElseThrow();
396             List<Notification> notificationTapiList = new ArrayList<>();
397             for (Map.Entry<SubscriptionFilterKey, SubscriptionFilter> sfEntry : notifSubscription
398                     .getSubscriptionFilter().entrySet()) {
399                 for (Uuid objectUuid:sfEntry.getValue().getRequestedObjectIdentifier()) {
400                     if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
401                         LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
402                         continue;
403                     }
404                     LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
405                     Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
406                         objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
407                         TapiNotificationDeserializer.class);
408                     notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
409                 }
410             }
411 //            for (Uuid objectUuid:notifSubscription.getSubscriptionFilter().getRequestedObjectIdentifier()) {
412 //                if (!this.topicManager.getTapiTopicMap().containsKey(objectUuid.getValue())) {
413 //                    LOG.warn("Topic doesnt exist for {}", objectUuid.getValue());
414 //                    continue;
415 //                }
416 //                LOG.info("Going to get notifications for topic {}", objectUuid.getValue());
417 //                Subscriber<NotificationTapiService, Notification> subscriber = new Subscriber<>(
418 //                    objectUuid.getValue(), objectUuid.getValue(), server, converterTapiService,
419 //                    TapiNotificationDeserializer.class);
420 //                notificationTapiList.addAll(subscriber.subscribe(objectUuid.getValue(), Notification.QNAME));
421 //            }
422             LOG.info("TAPI notifications = {}", notificationTapiList);
423             Map<NotificationKey, Notification> notificationMap = new HashMap<>();
424             for (Notification notif:notificationTapiList) {
425                 notificationMap.put(notif.key(), notif);
426             }
427             return RpcResultBuilder.success(new GetNotificationListOutputBuilder()
428                 .setNotification(notificationMap).build()).buildFuture();
429         } catch (InterruptedException | ExecutionException | NoSuchElementException e) {
430             LOG.error("Failed to get Notifications from Kafka", e);
431         }
432         return RpcResultBuilder.<GetNotificationListOutput>failed()
433             .withError(ErrorType.APPLICATION,
434                 "Notifications couldnt be retrieved from Kafka server").buildFuture();
435     }
436
437     public ImmutableClassToInstanceMap<Rpc<?, ?>> registerRPCs() {
438         return ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
439             .put(GetNotificationsProcessService.class, this::getNotificationsProcessService)
440             .put(GetNotificationsAlarmService.class, this::getNotificationsAlarmService)
441             .put(GetSupportedNotificationTypes.class, this::getSupportedNotificationTypes)
442             .put(CreateNotificationSubscriptionService.class, this::createNotificationSubscriptionService)
443             .put(UpdateNotificationSubscriptionService.class, this::updateNotificationSubscriptionService)
444             .put(DeleteNotificationSubscriptionService.class, this::deleteNotificationSubscriptionService)
445             .put(GetNotificationSubscriptionServiceDetails.class, this::getNotificationSubscriptionServiceDetails)
446             .put(GetNotificationSubscriptionServiceList.class, this::getNotificationSubscriptionServiceList)
447             .put(GetNotificationList.class, this::getNotificationList)
448             .build();
449     }
450
451     private NotificationContext getNotificationContext() {
452         LOG.info("Getting tapi notification context");
453         try {
454             InstanceIdentifier<NotificationContext> notificationcontextIID =
455                 InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
456                     .child(NotificationContext.class).build();
457             Optional<NotificationContext> notificationContextOptional
458                 = this.networkTransactionService.read(LogicalDatastoreType.OPERATIONAL, notificationcontextIID).get();
459             if (!notificationContextOptional.isPresent()) {
460                 LOG.error("Could not get TAPI notification context");
461                 return null;
462             }
463             return notificationContextOptional.orElseThrow();
464         } catch (InterruptedException | ExecutionException e) {
465             LOG.error("Could not get TAPI notification context");
466         }
467         return null;
468     }
469
470     private boolean updateNotificationContext(NotificationContext notificationContext1) {
471         try {
472             InstanceIdentifier<NotificationContext> notificationcontextIID =
473                 InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
474                     .child(NotificationContext.class).build();
475             this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, notificationcontextIID,
476                 notificationContext1);
477             this.networkTransactionService.commit().get();
478             return true;
479         } catch (InterruptedException | ExecutionException e) {
480             LOG.error("Could not update TAPI notification context");
481         }
482         return false;
483     }
484
485 }