2 * Copyright © 2020 Orange, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.transportpce.nbinotifications.impl;
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.List;
15 import java.util.Optional;
17 import java.util.UUID;
18 import java.util.concurrent.ExecutionException;
19 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
20 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
21 import org.opendaylight.transportpce.common.network.NetworkTransactionService;
22 import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
23 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
24 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
25 import org.opendaylight.transportpce.nbinotifications.utils.TopicManager;
26 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsAlarmServiceInput;
27 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsAlarmServiceOutput;
28 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsAlarmServiceOutputBuilder;
29 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsProcessServiceInput;
30 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsProcessServiceOutput;
31 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.GetNotificationsProcessServiceOutputBuilder;
32 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NbiNotificationsService;
33 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationAlarmService;
34 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationProcessService;
35 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.get.notifications.alarm.service.output.NotificationsAlarmService;
36 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.get.notifications.process.service.output.NotificationsProcessService;
37 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.Context;
38 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.Uuid;
39 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.Context1;
40 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.CreateNotificationSubscriptionServiceInput;
41 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.CreateNotificationSubscriptionServiceOutput;
42 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.CreateNotificationSubscriptionServiceOutputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.DeleteNotificationSubscriptionServiceInput;
44 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.DeleteNotificationSubscriptionServiceOutput;
45 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationListInput;
46 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationListOutput;
47 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceDetailsInput;
48 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceDetailsOutput;
49 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceListInput;
50 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetNotificationSubscriptionServiceListOutput;
51 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetSupportedNotificationTypesInput;
52 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.GetSupportedNotificationTypesOutput;
53 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.NotificationType;
54 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.ObjectType;
55 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.TapiNotificationService;
56 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.UpdateNotificationSubscriptionServiceInput;
57 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.UpdateNotificationSubscriptionServiceOutput;
58 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.context.NotificationContext;
59 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.context.NotificationContextBuilder;
60 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.create.notification.subscription.service.output.SubscriptionService;
61 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.create.notification.subscription.service.output.SubscriptionServiceBuilder;
62 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscription;
63 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscriptionBuilder;
64 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.context.NotifSubscriptionKey;
65 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.subscription.service.SubscriptionFilter;
66 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.subscription.service.SubscriptionFilterBuilder;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.common.ErrorType;
69 import org.opendaylight.yangtools.yang.common.RpcResult;
70 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
74 public class NbiNotificationsImpl implements NbiNotificationsService, TapiNotificationService {
75 private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class);
76 private final JsonStringConverter<NotificationProcessService> converterService;
77 private final JsonStringConverter<NotificationAlarmService> converterAlarmService;
78 private final String server;
79 private final NetworkTransactionService networkTransactionService;
80 private final TopicManager topicManager;
82 public NbiNotificationsImpl(JsonStringConverter<NotificationProcessService> converterService,
83 JsonStringConverter<NotificationAlarmService> converterAlarmService, String server,
84 NetworkTransactionService networkTransactionService, TopicManager topicManager) {
85 this.converterService = converterService;
86 this.converterAlarmService = converterAlarmService;
88 this.networkTransactionService = networkTransactionService;
89 this.topicManager = topicManager;
93 public ListenableFuture<RpcResult<GetNotificationsProcessServiceOutput>> getNotificationsProcessService(
94 GetNotificationsProcessServiceInput input) {
95 LOG.info("RPC getNotificationsService received");
96 if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
97 LOG.warn("Missing mandatory params for input {}", input);
98 return RpcResultBuilder.success(new GetNotificationsProcessServiceOutputBuilder().build()).buildFuture();
100 Subscriber<NotificationProcessService, NotificationsProcessService> subscriber = new Subscriber<>(
101 input.getIdConsumer(), input.getGroupId(), server, converterService,
102 NotificationServiceDeserializer.class);
103 List<NotificationsProcessService> notificationServiceList = subscriber
104 .subscribe(input.getConnectionType().getName(), NotificationsProcessService.QNAME);
105 return RpcResultBuilder.success(new GetNotificationsProcessServiceOutputBuilder()
106 .setNotificationsProcessService(notificationServiceList).build()).buildFuture();
110 public ListenableFuture<RpcResult<GetNotificationsAlarmServiceOutput>> getNotificationsAlarmService(
111 GetNotificationsAlarmServiceInput input) {
112 LOG.info("RPC getNotificationsAlarmService received");
113 if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
114 LOG.warn("Missing mandatory params for input {}", input);
115 return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture();
117 Subscriber<NotificationAlarmService, NotificationsAlarmService> subscriber = new Subscriber<>(
118 input.getIdConsumer(), input.getGroupId(), server, converterAlarmService,
119 NotificationAlarmServiceDeserializer.class);
120 List<NotificationsAlarmService> notificationAlarmServiceList = subscriber
121 .subscribe("alarm" + input.getConnectionType().getName(), NotificationsAlarmService.QNAME);
122 return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder()
123 .setNotificationsAlarmService(notificationAlarmServiceList).build()).buildFuture();
127 public ListenableFuture<RpcResult<GetSupportedNotificationTypesOutput>>
128 getSupportedNotificationTypes(GetSupportedNotificationTypesInput input) {
133 public ListenableFuture<RpcResult<CreateNotificationSubscriptionServiceOutput>>
134 createNotificationSubscriptionService(CreateNotificationSubscriptionServiceInput input) {
136 for (Uuid uuid:input.getSubscriptionFilter().getRequestedObjectIdentifier()) {
137 LOG.info("Adding T-API topic: {} to Kafka server", uuid.getValue());
138 this.topicManager.addTapiTopic(uuid.getValue());
141 InstanceIdentifier<NotificationContext> notificationcontextIID =
142 InstanceIdentifier.builder(Context.class).augmentation(Context1.class)
143 .child(org.opendaylight.yang.gen.v1.urn
144 .onf.otcc.yang.tapi.notification.rev181210.context.NotificationContext.class)
146 Optional<NotificationContext> notificationContextOptional
147 = this.networkTransactionService.read(LogicalDatastoreType.OPERATIONAL, notificationcontextIID).get();
148 if (notificationContextOptional.isEmpty()) {
149 LOG.error("Could not create TAPI notification subscription service");
150 return RpcResultBuilder.<CreateNotificationSubscriptionServiceOutput>failed()
151 .withError(ErrorType.RPC, "Could not read notification context")
154 SubscriptionFilter subscriptionFilter = new SubscriptionFilterBuilder()
155 .setName(input.getSubscriptionFilter().getName())
156 .setLocalId(input.getSubscriptionFilter().getLocalId())
157 .setIncludeContent(input.getSubscriptionFilter().getIncludeContent())
158 .setRequestedNotificationTypes(input.getSubscriptionFilter().getRequestedNotificationTypes())
159 .setRequestedLayerProtocols(input.getSubscriptionFilter().getRequestedLayerProtocols())
160 .setRequestedObjectIdentifier(input.getSubscriptionFilter().getRequestedObjectIdentifier())
161 .setRequestedObjectTypes(input.getSubscriptionFilter().getRequestedObjectTypes())
163 SubscriptionService subscriptionService = new SubscriptionServiceBuilder()
164 .setSubscriptionFilter(subscriptionFilter)
165 .setSubscriptionState(input.getSubscriptionState()).build();
166 Uuid notifSubscriptionUuid = new Uuid(UUID.randomUUID().toString());
167 NotifSubscriptionKey notifSubscriptionKey = new NotifSubscriptionKey(notifSubscriptionUuid);
168 Set<NotificationType> notificationTypes = (subscriptionFilter.getRequestedNotificationTypes() != null)
169 ? subscriptionFilter.getRequestedNotificationTypes()
170 : new HashSet<>(List.of(NotificationType.ALARMEVENT));
171 Set<ObjectType> objectTypes = (subscriptionFilter.getRequestedObjectTypes() != null)
172 ? subscriptionFilter.getRequestedObjectTypes()
173 : new HashSet<>(List.of(ObjectType.CONNECTIVITYSERVICE));
174 NotifSubscription notifSubscription = new NotifSubscriptionBuilder()
175 .setSubscriptionState(subscriptionService.getSubscriptionState())
176 .setSubscriptionFilter(subscriptionService.getSubscriptionFilter())
177 .setUuid(notifSubscriptionUuid)
178 .setSupportedNotificationTypes(notificationTypes)
179 .setSupportedObjectTypes(objectTypes)
180 .setName(subscriptionService.getName())
182 NotificationContext notificationContext = notificationContextOptional.get();
183 Map<NotifSubscriptionKey, NotifSubscription> notifSubscriptions = new HashMap<>();
184 if (notificationContext.getNotifSubscription() != null) {
185 notifSubscriptions.putAll(notificationContext.getNotifSubscription());
187 notifSubscriptions.put(notifSubscriptionKey, notifSubscription);
188 NotificationContext notificationContext1 = new NotificationContextBuilder()
189 .setNotification(notificationContext.getNotification())
190 .setNotifSubscription(notifSubscriptions)
192 this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, notificationcontextIID,
193 notificationContext1);
194 this.networkTransactionService.commit().get();
195 CreateNotificationSubscriptionServiceOutput serviceOutput =
196 new CreateNotificationSubscriptionServiceOutputBuilder()
197 .setSubscriptionService(subscriptionService)
199 return RpcResultBuilder.success(serviceOutput).buildFuture();
200 } catch (InterruptedException | ExecutionException e) {
201 LOG.error("Could not create TAPI notification subscription service");
202 return RpcResultBuilder.<CreateNotificationSubscriptionServiceOutput>failed()
203 .withError(ErrorType.RPC, "Could not read notification context").buildFuture();
208 public ListenableFuture<RpcResult<UpdateNotificationSubscriptionServiceOutput>>
209 updateNotificationSubscriptionService(UpdateNotificationSubscriptionServiceInput input) {
214 public ListenableFuture<RpcResult<DeleteNotificationSubscriptionServiceOutput>>
215 deleteNotificationSubscriptionService(DeleteNotificationSubscriptionServiceInput input) {
220 public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceDetailsOutput>>
221 getNotificationSubscriptionServiceDetails(GetNotificationSubscriptionServiceDetailsInput input) {
226 public ListenableFuture<RpcResult<GetNotificationSubscriptionServiceListOutput>>
227 getNotificationSubscriptionServiceList(GetNotificationSubscriptionServiceListInput input) {
232 public ListenableFuture<RpcResult<GetNotificationListOutput>> getNotificationList(GetNotificationListInput input) {