"YANG definitions for using REST API in NBI notifications module. Copyright
(c) 2020 ORANGE and others. All rights reserved.";
+ revision 2021-06-28 {
+ description
+ "Implement new models, RPC for service alarms";
+ }
+
revision 2020-11-30 {
description
"Initial revision of NBI notifications";
}
}
+ grouping notification-alarm-service {
+ leaf message {
+ type string;
+ mandatory true;
+ description
+ "Message for the specified service";
+ }
+ leaf service-name {
+ type string;
+ mandatory true;
+ description
+ "Identifier for the service to be created in the ROADM network, e.g., CLFI, CLCI, etc.
+ This is reported against the service, but may not get reflected in the service in the network.";
+ }
+ leaf connection-type {
+ type org-openroadm-common-service-types:connection-type;
+ mandatory true;
+ }
+ leaf operational-state {
+ type org-openroadm-common-state-types:state;
+ config false;
+ description
+ "Operational State: Actual state of service";
+ }
+ }
+
container notification-service {
description
"Model used to send a notification from a service request";
uses notification-service;
}
+ container notification-alarm-service {
+ description
+ "Model used to send a notification from the service listener";
+ uses notification-alarm-service;
+ }
+
rpc get-notifications-service {
- description "Get the notifications service send by ServiceHandler by filtering through connection type";
+ description "Get the notifications service sent by ServiceHandler through filtering connection type";
input {
leaf connection-type {
type org-openroadm-common-service-types:connection-type;
}
}
+ rpc get-notifications-alarm-service {
+ description "Get the notifications alarm service sent by ServiceListener through filtering connection type";
+ input {
+ leaf connection-type {
+ type org-openroadm-common-service-types:connection-type;
+ mandatory true;
+ description
+ "Type connection of the service";
+ }
+ leaf id-consumer {
+ type string;
+ mandatory true;
+ description
+ "Unique ID for the consumer";
+ }
+ leaf group-id {
+ type string;
+ mandatory true;
+ description
+ "ID Group for the consumer";
+ }
+ }
+ output {
+ list notification-alarm-service {
+ uses notification-alarm-service;
+ }
+ }
+ }
+
notification publish-notification-service {
description "Publish the notifications service for topic";
leaf topic {
}
uses notification-service;
}
+
+ notification publish-notification-alarm-service {
+ description "Publish the notifications service alarm for topic";
+ leaf topic {
+ type string;
+ mandatory true;
+ description
+ "Topic where to send the notification service alarm";
+ }
+ uses notification-alarm-service;
+ }
}
import org.opendaylight.mdsal.binding.api.NotificationService;
import org.opendaylight.transportpce.dmaap.client.listener.NbiNotificationsListenerImpl;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.opendaylight.transportpce.dmaap.client.resource.EventsApi;
import org.opendaylight.transportpce.dmaap.client.resource.config.JsonConfigurator;
import org.opendaylight.transportpce.dmaap.client.resource.model.CreatedEvent;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
+ @Override
+ public void onPublishNotificationAlarmService(PublishNotificationAlarmService notification) {
+
+ }
+
}
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.opendaylight.transportpce.dmaap.client.resource.model.CreatedEvent;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
@Path("/events")
public interface EventsApi {
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirection;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.lgx.Lgx;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.Port;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEnd;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEnd;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEnd;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEnd;
//This class is a temporary workaround while waiting jackson
//support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
// This class is a temporary workaround while waiting jackson
// support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEnd;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEnd;
// This class is a temporary workaround while waiting jackson
// support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEnd;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEnd;
// This class is a temporary workaround while waiting jackson
// support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirectionBuilder;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
import org.opendaylight.yangtools.yang.common.Uint32;
import org.slf4j.LoggerFactory;
import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl;
import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl;
import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl;
+import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener;
import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations;
import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperationsImpl;
import org.opendaylight.transportpce.tapi.R2RTapiLinkDiscovery;
/**
* List of publisher topics.
*/
- private final List<String> publisherTopicList =
- Arrays.asList("PceListener", "ServiceHandlerOperations", "ServiceHandler", "RendererListener");
+ private final List<String> publisherTopicList = Arrays.asList("PceListener", "ServiceHandlerOperations",
+ "ServiceHandler", "RendererListener");
+ private final List<String> publisherTopicAlarmList = Arrays.asList("ServiceListener");
public TransportPCEImpl(LightyServices lightyServices, boolean activateNbiNotification) {
LOG.info("Initializing transaction providers ...");
lightyServices.getBindingNotificationPublishService(), networkModelService);
PceListenerImpl pceListenerImpl = new PceListenerImpl(rendererServiceOperations, pathComputationService,
lightyServices.getBindingNotificationPublishService(), serviceDataStoreOperations);
+ ServiceListener serviceListener = new ServiceListener(lightyServices.getBindingDataBroker(),
+ lightyServices.getBindingNotificationPublishService());
NetworkModelListenerImpl networkModelListenerImpl = new NetworkModelListenerImpl(
lightyServices.getBindingNotificationPublishService(), serviceDataStoreOperations);
ServicehandlerImpl servicehandler = new ServicehandlerImpl(lightyServices.getBindingDataBroker(),
pceListenerImpl, rendererListenerImpl, networkModelListenerImpl, serviceDataStoreOperations, "N/A");
servicehandlerProvider = new ServicehandlerProvider(lightyServices.getBindingDataBroker(),
lightyServices.getRpcProviderService(), lightyServices.getNotificationService(),
- serviceDataStoreOperations, pceListenerImpl, rendererListenerImpl, networkModelListenerImpl,
- servicehandler);
+ serviceDataStoreOperations, pceListenerImpl, serviceListener, rendererListenerImpl,
+ networkModelListenerImpl, servicehandler);
LOG.info("Creating tapi beans ...");
R2RTapiLinkDiscovery tapilinkDiscoveryImpl = new R2RTapiLinkDiscovery(lightyServices.getBindingDataBroker(),
if (activateNbiNotification) {
LOG.info("Creating nbi-notifications beans ...");
nbiNotificationsProvider = new NbiNotificationsProvider(
- publisherTopicList, null, null, lightyServices.getRpcProviderService(),
+ publisherTopicList, publisherTopicAlarmList, null, null, lightyServices.getRpcProviderService(),
lightyServices.getNotificationService(), lightyServices.getAdapterContext().currentSerializer());
}
}
.getInstance(),
org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.tapi.rev180928.$YangModuleInfoImpl
.getInstance(),
- org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.$YangModuleInfoImpl
+ org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.$YangModuleInfoImpl
.getInstance());
private static final Set<YangModuleInfo> TPCE_YANG_MODEL = Stream.concat(
import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public Subscriber(String id, String groupId, String suscriberServer,
JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.NotificationService> deserializer) {
+ .nbi.notifications.rev210628.NotificationService> deserializer) {
Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
+import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriberAlarm {
+ private static final Logger LOG = LoggerFactory.getLogger(SubscriberAlarm.class);
+
+ private final Consumer<String, NotificationAlarmService> consumer;
+
+ public SubscriberAlarm(String id, String groupId, String subscriberServer,
+ JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService> deserializer) {
+ Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
+ propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
+ propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationAlarmServiceDeserializer.class);
+ propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
+ if (subscriberServer != null && !subscriberServer.isBlank()) {
+ propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer);
+ }
+ LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
+ consumer = new KafkaConsumer<>(propsConsumer);
+ }
+
+ public List<NotificationAlarmService> subscribeAlarm(String topicName) {
+ LOG.info("Subscribe request to topic '{}' ", topicName);
+ consumer.subscribe(Collections.singleton(topicName));
+ final ConsumerRecords<String, NotificationAlarmService> consumerRecords = consumer
+ .poll(Duration.ofMillis(1000));
+ List<NotificationAlarmService> notificationAlarmServiceList = new ArrayList<>();
+ YangInstanceIdentifier.of(NotificationAlarmService.QNAME);
+ for (ConsumerRecord<String, NotificationAlarmService> record : consumerRecords) {
+ if (record.value() != null) {
+ notificationAlarmServiceList.add(record.value());
+ }
+ }
+ LOG.info("Getting records '{}' ", notificationAlarmServiceList);
+ consumer.unsubscribe();
+ consumer.close();
+ return notificationAlarmServiceList;
+ }
+
+ @VisibleForTesting public SubscriberAlarm(Consumer<String, NotificationAlarmService> consumer) {
+ this.consumer = consumer;
+ }
+}
import java.util.List;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInput;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutputBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.transportpce.nbinotifications.consumer.SubscriberAlarm;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceInput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceInput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
public class NbiNotificationsImpl implements NbiNotificationsService {
private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class);
private final JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.NotificationService> converter;
+ .nbi.notifications.rev210628.NotificationService> converterService;
+ private final JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService;
private final String server;
public NbiNotificationsImpl(JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.NotificationService> converter, String server) {
- this.converter = converter;
+ .nbi.notifications.rev210628.NotificationService> converterService,
+ JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService, String server) {
+ this.converterService = converterService;
+ this.converterAlarmService = converterAlarmService;
this.server = server;
}
LOG.warn("Missing mandatory params for input {}", input);
return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture();
}
- Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converter);
+ Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converterService);
List<NotificationService> notificationServiceList = subscriber
.subscribeService(input.getConnectionType().getName());
GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder()
.setNotificationService(notificationServiceList);
return RpcResultBuilder.success(output.build()).buildFuture();
}
+
+ @Override
+ public ListenableFuture<RpcResult<GetNotificationsAlarmServiceOutput>> getNotificationsAlarmService(
+ GetNotificationsAlarmServiceInput input) {
+ LOG.info("RPC getNotificationsAlarmService received");
+ if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
+ LOG.warn("Missing mandatory params for input {}", input);
+ return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture();
+ }
+ SubscriberAlarm subscriberAlarm = new SubscriberAlarm(input.getIdConsumer(), input.getGroupId(), server,
+ converterAlarmService);
+ List<NotificationAlarmService> notificationAlarmServiceList = subscriberAlarm
+ .subscribeAlarm("alarm" + input.getConnectionType().getName());
+ GetNotificationsAlarmServiceOutputBuilder output = new GetNotificationsAlarmServiceOutputBuilder()
+ .setNotificationAlarmService(notificationAlarmServiceList);
+ return RpcResultBuilder.success(output.build()).buildFuture();
+ }
}
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
+import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.slf4j.Logger;
public class NbiNotificationsProvider {
private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class);
- private static Map<String, Publisher> publishersMap = new HashMap<>();
+ private static Map<String, Publisher> publishersServiceMap = new HashMap<>();
+ private static Map<String, PublisherAlarm> publishersAlarmMap = new HashMap<>();
private final RpcProviderService rpcService;
private ObjectRegistration<NbiNotificationsService> rpcRegistration;
private ListenerRegistration<NbiNotificationsListener> listenerRegistration;
private NotificationService notificationService;
private final JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.NotificationService> converter;
- private final String suscriberServer;
+ .nbi.notifications.rev210628.NotificationService> converterService;
+ private final JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService;
+ private final String subscriberServer;
- public NbiNotificationsProvider(List<String> topics,
- String suscriberServer, String publisherServer,
+ public NbiNotificationsProvider(List<String> topicsService, List<String> topicsAlarm,
+ String subscriberServer, String publisherServer,
RpcProviderService rpcProviderService, NotificationService notificationService,
BindingDOMCodecServices bindingDOMCodecServices) {
this.rpcService = rpcProviderService;
this.notificationService = notificationService;
- converter = new JsonStringConverter<>(bindingDOMCodecServices);
- for (String topic: topics) {
+ converterService = new JsonStringConverter<>(bindingDOMCodecServices);
+ for (String topic: topicsService) {
+ LOG.info("Creating publisher for topic {}", topic);
+ publishersServiceMap.put(topic, new Publisher(topic, publisherServer, converterService));
+ }
+ converterAlarmService = new JsonStringConverter<>(bindingDOMCodecServices);
+ for (String topic: topicsAlarm) {
LOG.info("Creating publisher for topic {}", topic);
- publishersMap.put(topic, new Publisher(topic, publisherServer, converter));
+ publishersAlarmMap.put(topic, new PublisherAlarm(topic, publisherServer, converterAlarmService));
}
- this.suscriberServer = suscriberServer;
+ this.subscriberServer = subscriberServer;
}
/**
public void init() {
LOG.info("NbiNotificationsProvider Session Initiated");
rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class,
- new NbiNotificationsImpl(converter, suscriberServer));
+ new NbiNotificationsImpl(converterService, converterAlarmService, subscriberServer));
listenerRegistration = notificationService.registerNotificationListener(
- new NbiNotificationsListenerImpl(publishersMap));
+ new NbiNotificationsListenerImpl(publishersServiceMap, publishersAlarmMap));
}
/**
* Method called when the blueprint container is destroyed.
*/
public void close() {
- for (Publisher publisher : publishersMap.values()) {
+ for (Publisher publisher : publishersServiceMap.values()) {
publisher.close();
}
+ for (PublisherAlarm publisherAlarm : publishersAlarmMap.values()) {
+ publisherAlarm.close();
+ }
rpcRegistration.close();
listenerRegistration.close();
LOG.info("NbiNotificationsProvider Closed");
*/
package org.opendaylight.transportpce.nbinotifications.listener;
-import java.util.HashMap;
import java.util.Map;
import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsListenerImpl.class);
- private Map<String, Publisher> publishersMap = new HashMap<>();
+ private Map<String, Publisher> publishersServiceMap;
+ private Map<String, PublisherAlarm> publishersAlarmMap;
- public NbiNotificationsListenerImpl(Map<String, Publisher> publishersMap) {
- this.publishersMap = publishersMap;
+ public NbiNotificationsListenerImpl(Map<String, Publisher> publishersServiceMap,
+ Map<String, PublisherAlarm> publishersAlarmMap) {
+ this.publishersServiceMap = publishersServiceMap;
+ this.publishersAlarmMap = publishersAlarmMap;
}
@Override
public void onPublishNotificationService(PublishNotificationService notification) {
LOG.info("Receiving request for publishing notification service");
String topic = notification.getTopic();
- if (!publishersMap.containsKey(topic)) {
+ if (!publishersServiceMap.containsKey(topic)) {
LOG.error("Unknown topic {}", topic);
return;
}
- Publisher publisher = publishersMap.get(topic);
+ Publisher publisher = publishersServiceMap.get(topic);
publisher.sendEvent(new NotificationServiceBuilder().setCommonId(notification.getCommonId())
.setConnectionType(notification.getConnectionType()).setMessage(notification.getMessage())
.setOperationalState(notification.getOperationalState())
.setResponseFailed(notification.getResponseFailed())
- .setServiceAEnd(notification.getServiceAEnd()).setServiceName(notification.getServiceName())
+ .setServiceAEnd(notification.getServiceAEnd())
+ .setServiceName(notification.getServiceName())
.setServiceZEnd(notification.getServiceZEnd()).build());
-
}
+ @Override
+ public void onPublishNotificationAlarmService(PublishNotificationAlarmService notification) {
+ LOG.info("Receiving request for publishing notification alarm service");
+ String topic = notification.getTopic();
+ if (!publishersAlarmMap.containsKey(topic)) {
+ LOG.error("Unknown topic {}", topic);
+ return;
+ }
+ PublisherAlarm publisherAlarm = publishersAlarmMap.get(topic);
+ publisherAlarm.sendEvent(new NotificationAlarmServiceBuilder().setConnectionType(notification
+ .getConnectionType())
+ .setMessage(notification.getMessage())
+ .setOperationalState(notification.getOperationalState())
+ .setServiceName(notification.getServiceName())
+ .build());
+ }
}
import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.producer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer;
+import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PublisherAlarm {
+ private static final Logger LOG = LoggerFactory.getLogger(PublisherAlarm.class);
+
+ private final String id;
+ private final Producer<String, NotificationAlarmService> producer;
+
+ public PublisherAlarm(String id, String publisherServer, JsonStringConverter<NotificationAlarmService> serializer) {
+ Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
+ if (publisherServer != null && !publisherServer.isBlank()) {
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
+ }
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationAlarmServiceSerializer.class);
+ properties.put(ConfigConstants.CONVERTER , serializer);
+ LOG.info("Creationg publisher for id {} with properties {}", id, properties);
+ producer = new KafkaProducer<>(properties);
+ this.id = id;
+ }
+
+ @VisibleForTesting
+ PublisherAlarm(String id, Producer<String, NotificationAlarmService> producer) {
+ this.producer = producer;
+ this.id = id;
+ }
+
+ public void close() {
+ producer.close();
+ }
+
+ public void sendEvent(NotificationAlarmService notificationAlarmService) {
+ LOG.info("SendEvent request to topic '{}' ", notificationAlarmService.getConnectionType().getName());
+ producer.send(new ProducerRecord<>("alarm" + notificationAlarmService.getConnectionType().getName(),
+ id, notificationAlarmService));
+ producer.flush();
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmServiceBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationAlarmServiceDeserializer implements Deserializer<NotificationAlarmService> {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationAlarmServiceDeserializer.class);
+ private JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService> converter;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ LOG.info("Deserializer configuration {}", configs);
+ if (configs.containsKey(ConfigConstants.CONVERTER)
+ && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+ converter = (JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService>) configs
+ .get(ConfigConstants.CONVERTER);
+ }
+ }
+
+ @Override
+ public NotificationAlarmService deserialize(String topic, byte[] data) {
+ if (converter == null) {
+ throw new IllegalArgumentException(
+ "Converter should be configured through configure method of deserializer");
+ }
+ String value = new String(data, StandardCharsets.UTF_8);
+ // The message published is
+ // org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService
+ // we have to map it to
+ // org.opendaylight.yang.gen
+ // .v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService
+ org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService mappedString = converter
+ .createDataObjectFromJsonString(YangInstanceIdentifier.of(
+ org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService.QNAME),
+ value,
+ JSONCodecFactorySupplier.RFC7951);
+ if (mappedString != null) {
+ LOG.info("Reading event {}", mappedString);
+ return new NotificationAlarmServiceBuilder().setConnectionType(mappedString.getConnectionType())
+ .setMessage(mappedString.getMessage())
+ .setOperationalState(mappedString.getOperationalState())
+ .setServiceName(mappedString.getServiceName())
+ .build();
+ }
+ return null;
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class NotificationAlarmServiceSerializer implements Serializer<NotificationAlarmService> {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationAlarmServiceSerializer.class);
+ private JsonStringConverter<NotificationAlarmService> converter;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ LOG.info("Deserializer configuration {}", configs);
+ if (configs.containsKey(ConfigConstants.CONVERTER)
+ && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+ converter = (JsonStringConverter<NotificationAlarmService>) configs.get(ConfigConstants.CONVERTER);
+ }
+ }
+
+ @Override
+ public byte[] serialize(String topic, NotificationAlarmService data) {
+ if (converter == null) {
+ throw new IllegalArgumentException(
+ "Converter should be" + "configured through configure method of serializer");
+ }
+ if (data == null) {
+ return new byte[0];
+ }
+ try {
+ InstanceIdentifier<NotificationAlarmService> iid =
+ InstanceIdentifier.builder(NotificationAlarmService.class).build();
+ String serialized = converter.createJsonStringFromDataObject(iid, data, JSONCodecFactorySupplier.RFC7951);
+ LOG.info("Serialized event {}", serialized);
+ return serialized.getBytes(StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ return new byte[0];
+ }
+ }
+}
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
import org.slf4j.Logger;
public class NotificationServiceDeserializer implements Deserializer<NotificationService> {
private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceDeserializer.class);
private JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.NotificationService> converter;
+ .nbi.notifications.rev210628.NotificationService> converter;
@SuppressWarnings("unchecked")
@Override
if (configs.containsKey(ConfigConstants.CONVERTER)
&& configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
converter = (JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.NotificationService>) configs
+ .nbi.notifications.rev210628.NotificationService>) configs
.get(ConfigConstants.CONVERTER);
}
}
// we have to map it to
// org.opendaylight.yang.gen
// .v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService
- org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService mappedString = converter
+ org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService mappedString = converter
.createDataObjectFromJsonString(YangInstanceIdentifier.of(
- org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService.QNAME),
+ org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService.QNAME),
value,
JSONCodecFactorySupplier.RFC7951);
if (mappedString != null) {
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
import org.slf4j.Logger;
<value>RendererListener</value>
</list>
</argument>
+ <argument>
+ <list value-type="java.lang.String">
+ <value>ServiceListener</value>
+ </list>
+ </argument>
<argument value="${suscriber.server}"/>
<argument value="${publisher.server}"/>
<argument ref="rpcService" />
import org.junit.Test;
import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils;
import org.opendaylight.transportpce.test.AbstractTest;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
public class SubscriberTest extends AbstractTest {
private static final String TOPIC = "topic";
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.test.AbstractTest;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInputBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceInputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutput;
import org.opendaylight.yangtools.yang.common.RpcResult;
public class NbiNotificationsImplTest extends AbstractTest {
@Before
public void setUp() {
JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.NotificationService> converter = new JsonStringConverter<>(
+ .nbi.notifications.rev210628.NotificationService> converter = new JsonStringConverter<>(
getDataStoreContextUtil().getBindingDOMCodecServices());
- nbiNotificationsImpl = new NbiNotificationsImpl(converter, "localhost:8080");
+ JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService> converterAlarm = new JsonStringConverter<>(
+ getDataStoreContextUtil().getBindingDOMCodecServices());
+ nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm,"localhost:8080");
}
public void getNotificationsServiceEmptyDataTest() throws InterruptedException, ExecutionException {
@Test
public void initTest() {
NbiNotificationsProvider provider = new NbiNotificationsProvider(
- Arrays.asList("topic1", "topic2"), "localhost:8080", "localhost:8080",
- rpcProviderRegistry, notificationService,
+ Arrays.asList("topic1", "topic2"), Arrays.asList("topic1", "topic2"), "localhost:8080",
+ "localhost:8080", rpcProviderRegistry, notificationService,
getDataStoreContextUtil().getBindingDOMCodecServices());
provider.init();
verify(rpcProviderRegistry, times(1))
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
+import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
import org.opendaylight.transportpce.test.AbstractTest;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
public class NbiNotificationsListenerImplTest extends AbstractTest {
@Mock
private Publisher publisher;
+ @Mock
+ private PublisherAlarm publisherAlarm;
@Before
public void setUp() {
@Test
public void onPublishNotificationServiceTest() {
- NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher));
+ NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher),
+ Map.of("test", publisherAlarm));
PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("test")
.setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
.setOperationalState(State.OutOfService).setServiceName("service name").build();
@Test
public void onPublishNotificationServiceWrongTopicTest() {
- NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher));
+ NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher),
+ Map.of("test", publisherAlarm));
PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("wrongtopic")
.setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
.setOperationalState(State.OutOfService).setServiceName("service name").build();
import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
import org.opendaylight.transportpce.test.AbstractTest;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
import org.junit.Test;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.test.AbstractTest;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
public class NotificationServiceDeserializerTest extends AbstractTest {
@Test
public void deserializeTest() throws IOException {
- JsonStringConverter<org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService> converter =
+ JsonStringConverter<org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService> converter =
new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
NotificationServiceDeserializer deserializer = new NotificationServiceDeserializer();
Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import org.junit.Test;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.test.AbstractTest;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
serializer.close();
assertNotNull("Serialized data should not be null", data);
String expectedJson = Files.readString(Paths.get("src/test/resources/expected_event.json"));
+ // Minify the json string
+ expectedJson = new ObjectMapper().readValue(expectedJson, JsonNode.class).toString();
assertEquals("The event should be equals", expectedJson, new String(data, StandardCharsets.UTF_8));
}
}
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.PortBuilder;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
import org.opendaylight.yangtools.yang.common.Uint32;
public final class NotificationServiceDataUtils {
}
public static org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.get.notifications.service.output.NotificationService buildReceivedEvent() {
+ .nbi.notifications.rev210628.get.notifications.service.output.NotificationService buildReceivedEvent() {
org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder
+ .nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder
notificationServiceBuilder = new org.opendaylight.yang.gen.v1
- .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder()
+ .nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder()
.setMessage("message")
.setServiceName("service1")
.setOperationalState(State.InService)
{
"notification-service": {
- "service-name": "service1",
- "service-a-end": {
+ "service-z-end": {
"service-format": "OC",
"node-id": "XPONDER-1-2",
"service-rate": 1,
}
}
},
- "common-id": "commond-id",
- "operational-state": "inService",
"connection-type": "service",
- "service-z-end": {
+ "operational-state": "inService",
+ "common-id": "commond-id",
+ "service-a-end": {
"service-format": "OC",
"node-id": "XPONDER-1-2",
"service-rate": 1,
}
}
},
+ "service-name": "service1",
"message": "message",
"response-failed": ""
}
-{"notification-service":{"service-name":"service1","service-a-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"common-id":"commond-id","operational-state":"inService","connection-type":"service","service-z-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"message":"message","response-failed":""}}
\ No newline at end of file
+{
+ "notification-service": {
+ "service-z-end": {
+ "clli": "clli",
+ "service-rate": 1,
+ "node-id": "XPONDER-1-2",
+ "rx-direction": {
+ "lgx": {
+ "lgx-port-name": "lgx port name",
+ "lgx-port-shelf": "lgx port shelf",
+ "lgx-port-rack": "lgx port rack",
+ "lgx-device-name": "lgx device name"
+ },
+ "port": {
+ "port-shelf": "port shelf",
+ "port-rack": "port rack",
+ "port-device-name": "device name",
+ "port-name": "port name",
+ "port-slot": "port slot",
+ "port-type": "port type",
+ "port-sub-slot": "port subslot"
+ }
+ },
+ "service-format": "OC",
+ "tx-direction": {
+ "lgx": {
+ "lgx-port-name": "lgx port name",
+ "lgx-port-shelf": "lgx port shelf",
+ "lgx-port-rack": "lgx port rack",
+ "lgx-device-name": "lgx device name"
+ },
+ "port": {
+ "port-shelf": "port shelf",
+ "port-rack": "port rack",
+ "port-device-name": "device name",
+ "port-name": "port name",
+ "port-slot": "port slot",
+ "port-type": "port type",
+ "port-sub-slot": "port subslot"
+ }
+ }
+ },
+ "connection-type": "service",
+ "operational-state": "inService",
+ "common-id": "commond-id",
+ "service-a-end": {
+ "clli": "clli",
+ "service-rate": 1,
+ "node-id": "XPONDER-1-2",
+ "rx-direction": {
+ "lgx": {
+ "lgx-port-name": "lgx port name",
+ "lgx-port-shelf": "lgx port shelf",
+ "lgx-port-rack": "lgx port rack",
+ "lgx-device-name": "lgx device name"
+ },
+ "port": {
+ "port-shelf": "port shelf",
+ "port-rack": "port rack",
+ "port-device-name": "device name",
+ "port-name": "port name",
+ "port-slot": "port slot",
+ "port-type": "port type",
+ "port-sub-slot": "port subslot"
+ }
+ },
+ "service-format": "OC",
+ "tx-direction": {
+ "lgx": {
+ "lgx-port-name": "lgx port name",
+ "lgx-port-shelf": "lgx port shelf",
+ "lgx-port-rack": "lgx port rack",
+ "lgx-device-name": "lgx device name"
+ },
+ "port": {
+ "port-shelf": "port shelf",
+ "port-rack": "port rack",
+ "port-device-name": "device name",
+ "port-name": "port name",
+ "port-slot": "port slot",
+ "port-type": "port type",
+ "port-sub-slot": "port subslot"
+ }
+ }
+ },
+ "service-name": "service1",
+ "response-failed": "",
+ "message": "message"
+ }
+}
\ No newline at end of file
import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.delete.input.ServiceDeleteReqInfo.TailRetention;
import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.delete.input.ServiceDeleteReqInfoBuilder;
import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
package org.opendaylight.transportpce.servicehandler.impl;
import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
import org.opendaylight.mdsal.binding.api.NotificationService;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl;
import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl;
import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl;
+import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener;
import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations;
import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.networkmodel.rev201116.TransportpceNetworkmodelListener;
import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.pce.rev210701.TransportpcePceListener;
import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.renderer.rev210618.TransportpceRendererListener;
import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.OrgOpenroadmServiceService;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.ServiceList;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServicehandlerProvider {
private static final Logger LOG = LoggerFactory.getLogger(ServicehandlerProvider.class);
+ private static final InstanceIdentifier<Services> SERVICE = InstanceIdentifier.builder(ServiceList.class)
+ .child(Services.class).build();
private final DataBroker dataBroker;
private final RpcProviderService rpcService;
private final NotificationService notificationService;
private ListenerRegistration<TransportpcePceListener> pcelistenerRegistration;
+ private ListenerRegistration<ServiceListener> serviceDataTreeChangeListenerRegistration;
private ListenerRegistration<TransportpceRendererListener> rendererlistenerRegistration;
private ListenerRegistration<TransportpceNetworkmodelListener> networkmodellistenerRegistration;
private ObjectRegistration<OrgOpenroadmServiceService> rpcRegistration;
private ServiceDataStoreOperations serviceDataStoreOperations;
private PceListenerImpl pceListenerImpl;
+ private ServiceListener serviceListener;
private RendererListenerImpl rendererListenerImpl;
private NetworkModelListenerImpl networkModelListenerImpl;
private ServicehandlerImpl servicehandler;
public ServicehandlerProvider(final DataBroker dataBroker, RpcProviderService rpcProviderService,
NotificationService notificationService, ServiceDataStoreOperations serviceDataStoreOperations,
- PceListenerImpl pceListenerImpl, RendererListenerImpl rendererListenerImpl,
+ PceListenerImpl pceListenerImpl, ServiceListener serviceListener, RendererListenerImpl rendererListenerImpl,
NetworkModelListenerImpl networkModelListenerImpl, ServicehandlerImpl servicehandler) {
this.dataBroker = dataBroker;
this.rpcService = rpcProviderService;
this.serviceDataStoreOperations = serviceDataStoreOperations;
this.serviceDataStoreOperations.initialize();
this.pceListenerImpl = pceListenerImpl;
+ this.serviceListener = serviceListener;
this.rendererListenerImpl = rendererListenerImpl;
this.networkModelListenerImpl = networkModelListenerImpl;
this.servicehandler = servicehandler;
public void init() {
LOG.info("ServicehandlerProvider Session Initiated");
pcelistenerRegistration = notificationService.registerNotificationListener(pceListenerImpl);
+ serviceDataTreeChangeListenerRegistration = dataBroker.registerDataTreeChangeListener(
+ DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, SERVICE), serviceListener);
rendererlistenerRegistration = notificationService.registerNotificationListener(rendererListenerImpl);
networkmodellistenerRegistration = notificationService.registerNotificationListener(networkModelListenerImpl);
rpcRegistration = rpcService.registerRpcImplementation(OrgOpenroadmServiceService.class, servicehandler);
public void close() {
LOG.info("ServicehandlerProvider Closed");
pcelistenerRegistration.close();
+ serviceDataTreeChangeListenerRegistration.close();
rendererlistenerRegistration.close();
networkmodellistenerRegistration.close();
rpcRegistration.close();
import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.RpcStatusEx;
import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.response.parameters.sp.ResponseParameters;
import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.response.parameters.sp.ResponseParametersBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.opendaylight.yang.gen.v1.http.org.openroadm.equipment.states.types.rev191129.AdminStates;
import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
import org.opendaylight.yang.gen.v1.http.transportpce.topology.rev210511.OtnLinkType;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
import org.opendaylight.yangtools.yang.common.Uint32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
--- /dev/null
+/*
+ * Copyright © 2021 Orange and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.servicehandler.listeners;
+
+import java.util.Collection;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmServiceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServiceListener implements DataTreeChangeListener<Services> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServiceListener.class);
+ private static final String TOPIC = "ServiceListener";
+ private final DataBroker dataBroker;
+ private NotificationPublishService notificationPublishService;
+
+ public ServiceListener(final DataBroker dataBroker, NotificationPublishService notificationPublishService) {
+ this.dataBroker = dataBroker;
+ this.notificationPublishService = notificationPublishService;
+ }
+
+ public void onDataTreeChanged(Collection<DataTreeModification<Services>> changes) {
+ LOG.info("onDataTreeChanged - {}", this.getClass().getSimpleName());
+ for (DataTreeModification<Services> change : changes) {
+ DataObjectModification<Services> rootService = change.getRootNode();
+ if (rootService.getDataBefore() == null) {
+ continue;
+ }
+ String serviceName = rootService.getDataBefore().key().getServiceName();
+ switch (rootService.getModificationType()) {
+ case DELETE:
+ LOG.info("Service {} correctly deleted from controller", serviceName);
+ break;
+ case WRITE:
+ Services input = rootService.getDataAfter();
+ if (rootService.getDataBefore().getOperationalState() == State.InService
+ && rootService.getDataAfter().getOperationalState() == State.OutOfService) {
+ LOG.info("Service {} is becoming outOfService", serviceName);
+ sendNbiNotification(new PublishNotificationAlarmServiceBuilder()
+ .setServiceName(input.getServiceName())
+ .setConnectionType(input.getConnectionType())
+ .setMessage("The service is now outOfService")
+ .setOperationalState(State.OutOfService)
+ .setTopic(TOPIC)
+ .build());
+ }
+ else if (rootService.getDataBefore().getOperationalState() == State.OutOfService
+ && rootService.getDataAfter().getOperationalState() == State.InService) {
+ LOG.info("Service {} is becoming InService", serviceName);
+ sendNbiNotification(new PublishNotificationAlarmServiceBuilder()
+ .setServiceName(input.getServiceName())
+ .setConnectionType(input.getConnectionType())
+ .setMessage("The service is now inService")
+ .setOperationalState(State.InService)
+ .setTopic(TOPIC)
+ .build());
+ }
+ break;
+ default:
+ LOG.debug("Unknown modification type {}", rootService.getModificationType().name());
+ break;
+ }
+ }
+ }
+
+ /**
+ * Send notification to NBI notification in order to publish message.
+ *
+ * @param service PublishNotificationAlarmService
+ */
+ private void sendNbiNotification(PublishNotificationAlarmService service) {
+ try {
+ notificationPublishService.putNotification(service);
+ } catch (InterruptedException e) {
+ LOG.warn("Cannot send notification to nbi", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+}
<argument ref="serviceDatastoreOperation" />
</bean>
+ <bean id="serviceListener" class="org.opendaylight.transportpce.servicehandler.listeners.ServiceListener">
+ <argument ref="dataBroker" />
+ <argument ref="notificationPublishService" />
+ </bean>
+
<bean id="rendererListener" class="org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl">
<argument ref="pathComputationService" />
<argument ref="notificationPublishService" />
<argument ref="notificationService" />
<argument ref="serviceDatastoreOperation" />
<argument ref="pceListener" />
+ <argument ref="serviceListener" />
<argument ref="rendererListener" />
<argument ref="networkModelListener" />
<argument ref="serviceHandlerImpl" />
import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl;
import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl;
import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl;
+import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener;
import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations;
import org.opendaylight.transportpce.test.AbstractTest;
@Mock
PceListenerImpl pceListenerImpl;
+ @Mock
+ ServiceListener serviceListener;
+
@Mock
RendererListenerImpl rendererListenerImpl;
@Mock
ServicehandlerImpl servicehandler;
+
private AutoCloseable closeable;
@Before
public void testInitRegisterServiceHandlerToRpcRegistry() {
ServicehandlerProvider provider = new ServicehandlerProvider(
getDataBroker(), rpcProviderRegistry,
- getNotificationService() , serviceDataStoreOperations, pceListenerImpl, rendererListenerImpl,
- networkModelListenerImpl, servicehandler);
+ getNotificationService() , serviceDataStoreOperations, pceListenerImpl, serviceListener,
+ rendererListenerImpl, networkModelListenerImpl, servicehandler);
provider.init();