From de218a942bf32f32e41d37b1f49f9a35e86d5dc0 Mon Sep 17 00:00:00 2001 From: Thierry Jiao Date: Thu, 17 Dec 2020 18:04:59 +0100 Subject: [PATCH] Add new Maven module to manage NBI Notifications - Implement a new Maven module named nbinotifications containing classes capable of communicating with Kafka server - Add the class Subscriber capable of reading events from topics Kafka - Add the class Publisher capable of writing events from topics Kafka - Add yang file nbi-notifications to model the service notifications - Implement a new RPC API named GetNotificationsService capable of returning the notifications stored in a topic Kafka - Implement a new RPC API named publishNotificationService in charge of publishing a message in Kafka topic - Add a listener to the RPC GetNotificationsService - Add new blueprint in nbinotifications - Add unit tests to check the functionning of nbinotifications classes - Update pom.xml to implement the new module nbinotifications JIRA: TRNSPRTPCE-343 Signed-off-by: Thierry Jiao Change-Id: I74298292034e4a158b27dd5db47e63918026684b --- .../yang/nbi-notifications@2020-11-30.yang | 120 ++++++++++++++++++ nbinotifications/pom.xml | 71 +++++++++++ .../nbinotifications/consumer/Subscriber.java | 72 +++++++++++ .../impl/NbiNotificationsImpl.java | 51 ++++++++ .../impl/NbiNotificationsProvider.java | 77 +++++++++++ .../NbiNotificationsListenerImpl.java | 45 +++++++ .../nbinotifications/producer/Publisher.java | 59 +++++++++ .../serialization/ConfigConstants.java | 16 +++ .../NotificationServiceDeserializer.java | 67 ++++++++++ .../NotificationServiceSerializer.java | 53 ++++++++ .../utils/NbiNotificationsUtils.java | 38 ++++++ .../blueprint/nobinotifications-blueprint.xml | 35 +++++ .../src/main/resources/publisher.properties | 8 ++ .../src/main/resources/subscriber.properties | 5 + .../consumer/SubscriberTest.java | 57 +++++++++ .../impl/NbiNotificationsImplTest.java | 50 ++++++++ .../impl/NbiNotificationsProviderTest.java | 50 ++++++++ .../NbiNotificationsListenerImplTest.java | 54 ++++++++ .../producer/PublisherTest.java | 55 ++++++++ .../NotificationServiceDeserializerTest.java | 35 +++++ .../NotificationServiceSerializerTest.java | 44 +++++++ .../utils/NotificationServiceDataUtils.java | 98 ++++++++++++++ .../src/test/resources/event.json | 90 +++++++++++++ .../src/test/resources/expected_event.json | 1 + .../src/test/resources/publisher.properties | 8 ++ .../src/test/resources/subscriber.properties | 5 + pom.xml | 1 + 27 files changed, 1265 insertions(+) create mode 100644 api/src/main/yang/nbi-notifications@2020-11-30.yang create mode 100644 nbinotifications/pom.xml create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/ConfigConstants.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializer.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/NbiNotificationsUtils.java create mode 100644 nbinotifications/src/main/resources/OSGI-INF/blueprint/nobinotifications-blueprint.xml create mode 100644 nbinotifications/src/main/resources/publisher.properties create mode 100644 nbinotifications/src/main/resources/subscriber.properties create mode 100644 nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java create mode 100644 nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java create mode 100644 nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java create mode 100644 nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java create mode 100644 nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java create mode 100644 nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java create mode 100644 nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java create mode 100644 nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java create mode 100644 nbinotifications/src/test/resources/event.json create mode 100644 nbinotifications/src/test/resources/expected_event.json create mode 100644 nbinotifications/src/test/resources/publisher.properties create mode 100644 nbinotifications/src/test/resources/subscriber.properties diff --git a/api/src/main/yang/nbi-notifications@2020-11-30.yang b/api/src/main/yang/nbi-notifications@2020-11-30.yang new file mode 100644 index 000000000..aa068cd53 --- /dev/null +++ b/api/src/main/yang/nbi-notifications@2020-11-30.yang @@ -0,0 +1,120 @@ +module nbi-notifications { + yang-version 1; + namespace "nbi-notifications"; + prefix nbinotifications; + + import org-openroadm-service { + prefix oor-service; + revision-date 2019-05-31; + } + import org-openroadm-common-service-types { + prefix org-openroadm-common-service-types; + revision-date 2019-05-31; + } + import org-openroadm-common-state-types { + prefix org-openroadm-common-state-types; + revision-date 2018-11-30; + } + + organization + "transportPCE"; + contact + "transportPCE committers - ODL"; + description + "YANG definitions for using REST API in NBI notifications module. Copyright + (c) 2020 ORANGE and others. All rights reserved."; + + revision 2020-11-30 { + description + "Initial revision of NBI notifications"; + } + + grouping notification-service { + leaf message { + type string; + mandatory true; + description + "Message for the specified service"; + } + leaf service-name { + type string; + mandatory true; + description + "Identifier for the service to be created in the ROADM network, e.g., CLFI, CLCI, etc. + This is reported against the service, but may not get reflected in the service in the network."; + } + leaf common-id { + type string; + description + "To be used by the ROADM controller to identify the routing constraints + received from planning application (PED)."; + } + leaf connection-type { + type org-openroadm-common-service-types:connection-type; + mandatory true; + } + container service-a-end { + uses org-openroadm-common-service-types:service-endpoint; + } + container service-z-end { + uses org-openroadm-common-service-types:service-endpoint; + } + leaf response-failed { + type string; + description + "Response of the error if the service request encountered an anomaly"; + } + leaf operational-state { + type org-openroadm-common-state-types:state; + config false; + description + "Operational State: Actual state of service"; + } + } + + container notification-service { + description + "Model used to send a notification from a service request"; + uses notification-service; + } + + rpc get-notifications-service { + description "Get the notifications service send by ServiceHandler by filtering through connection type"; + input { + leaf connection-type { + type org-openroadm-common-service-types:connection-type; + mandatory true; + description + "Type connection of the service "; + } + leaf id-consumer { + type string; + mandatory true; + description + "Unique ID for the consumer"; + } + leaf group-id { + type string; + mandatory true; + description + "ID Group for the consumer"; + } + } + output { + list notification-service { + uses notification-service; + } + } + } + + notification publish-notification-service { + description "Publish the notifications service for topic"; + leaf topic { + type string; + mandatory true; + description + "Topic where to send the notification service"; + } + uses notification-service; + } +} diff --git a/nbinotifications/pom.xml b/nbinotifications/pom.xml new file mode 100644 index 000000000..e49c0f15d --- /dev/null +++ b/nbinotifications/pom.xml @@ -0,0 +1,71 @@ + + + + 4.0.0 + + + org.opendaylight.mdsal + binding-parent + 7.0.5 + + + + org.opendaylight.transportpce + transportpce-nbinotifications + 3.0.0-SNAPSHOT + bundle + + + UTF-8 + 2.6.0 + + + + + ${project.groupId} + transportpce-common + ${project.version} + + + ${project.groupId} + transportpce-api + ${project.version} + + + ${project.groupId}.ordmodels + transportpce-ordmodels-service + ${project.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.opendaylight.mdsal + mdsal-binding-api + + + org.opendaylight.mdsal + mdsal-binding-dom-adapter + + + + ${project.groupId} + test-common + ${project.version} + test + + + com.fasterxml.jackson.core + jackson-databind + test + + + + diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java new file mode 100644 index 000000000..cb044b126 --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java @@ -0,0 +1,72 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.consumer; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants; +import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer; +import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Subscriber { + private static final Logger LOG = LoggerFactory.getLogger(Subscriber.class); + + private final Consumer consumer; + + public Subscriber(String id, String groupId, String suscriberServer, + JsonStringConverter deserializer) { + Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties"); + propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id); + propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationServiceDeserializer.class); + propsConsumer.put(ConfigConstants.CONVERTER , deserializer); + if (suscriberServer != null && !suscriberServer.isBlank()) { + propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, suscriberServer); + } + LOG.info("Suscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer); + consumer = new KafkaConsumer<>(propsConsumer); + } + + public List subscribeService(String topicName) { + LOG.info("Subscribe request to topic '{}' ", topicName); + consumer.subscribe(Collections.singleton(topicName)); + final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); + List notificationServiceList = new ArrayList<>(); + YangInstanceIdentifier.of(NotificationService.QNAME); + for (ConsumerRecord record : consumerRecords) { + if (record.value() != null) { + notificationServiceList.add(record.value()); + } + } + LOG.info("Getting records '{}' ", notificationServiceList); + consumer.unsubscribe(); + consumer.close(); + return notificationServiceList; + } + + @VisibleForTesting public Subscriber(Consumer consumer) { + this.consumer = consumer; + } +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java new file mode 100644 index 000000000..073ce49f8 --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.impl; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.List; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutputBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NbiNotificationsImpl implements NbiNotificationsService { + private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class); + private final JsonStringConverter converter; + private final String server; + + public NbiNotificationsImpl(JsonStringConverter converter, String server) { + this.converter = converter; + this.server = server; + } + + @Override + public ListenableFuture> getNotificationsService( + GetNotificationsServiceInput input) { + LOG.info("RPC getNotificationsService received"); + if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture(); + } + Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converter); + List notificationServiceList = subscriber + .subscribeService(input.getConnectionType().getName()); + GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder() + .setNotificationService(notificationServiceList); + return RpcResultBuilder.success(output.build()).buildFuture(); + } +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java new file mode 100644 index 000000000..5f4103446 --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java @@ -0,0 +1,77 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.opendaylight.mdsal.binding.api.NotificationService; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl; +import org.opendaylight.transportpce.nbinotifications.producer.Publisher; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NbiNotificationsProvider { + + private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class); + private static Map publishersMap = new HashMap<>(); + + private final RpcProviderService rpcService; + private ObjectRegistration rpcRegistration; + private ListenerRegistration listenerRegistration; + private NotificationService notificationService; + private final JsonStringConverter converter; + private final String suscriberServer; + + + public NbiNotificationsProvider(List topics, + String suscriberServer, String publisherServer, + RpcProviderService rpcProviderService, NotificationService notificationService, + BindingDOMCodecServices bindingDOMCodecServices) { + this.rpcService = rpcProviderService; + this.notificationService = notificationService; + converter = new JsonStringConverter<>(bindingDOMCodecServices); + for (String topic: topics) { + LOG.info("Creating publisher for topic {}", topic); + publishersMap.put(topic, new Publisher(topic, publisherServer, converter)); + } + this.suscriberServer = suscriberServer; + } + + /** + * Method called when the blueprint container is created. + */ + public void init() { + LOG.info("NbiNotificationsProvider Session Initiated"); + rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class, + new NbiNotificationsImpl(converter, suscriberServer)); + listenerRegistration = notificationService.registerNotificationListener( + new NbiNotificationsListenerImpl(publishersMap)); + } + + /** + * Method called when the blueprint container is destroyed. + */ + public void close() { + for (Publisher publisher : publishersMap.values()) { + publisher.close(); + } + rpcRegistration.close(); + listenerRegistration.close(); + LOG.info("NbiNotificationsProvider Closed"); + } + +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java new file mode 100644 index 000000000..0bcdf6bd2 --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.listener; + +import java.util.HashMap; +import java.util.Map; +import org.opendaylight.transportpce.nbinotifications.producer.Publisher; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NbiNotificationsListenerImpl implements NbiNotificationsListener { + private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsListenerImpl.class); + private Map publishersMap = new HashMap<>(); + + public NbiNotificationsListenerImpl(Map publishersMap) { + this.publishersMap = publishersMap; + } + + @Override + public void onPublishNotificationService(PublishNotificationService notification) { + LOG.info("Receiving request for publishing notification service"); + String topic = notification.getTopic(); + if (!publishersMap.containsKey(topic)) { + LOG.error("Unknown topic {}", topic); + return; + } + Publisher publisher = publishersMap.get(topic); + publisher.sendEvent(new NotificationServiceBuilder().setCommonId(notification.getCommonId()) + .setConnectionType(notification.getConnectionType()).setMessage(notification.getMessage()) + .setOperationalState(notification.getOperationalState()) + .setResponseFailed(notification.getResponseFailed()) + .setServiceAEnd(notification.getServiceAEnd()).setServiceName(notification.getServiceName()) + .setServiceZEnd(notification.getServiceZEnd()).build()); + + } + +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java new file mode 100644 index 000000000..21295a9b6 --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java @@ -0,0 +1,59 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.producer; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Properties; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants; +import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer; +import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Publisher { + private static final Logger LOG = LoggerFactory.getLogger(Publisher.class); + + private final String id; + private final Producer producer; + + public Publisher(String id, String publisherServer, JsonStringConverter serializer) { + Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties"); + properties.put(ProducerConfig.CLIENT_ID_CONFIG, id); + if (publisherServer != null && !publisherServer.isBlank()) { + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer); + } + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationServiceSerializer.class); + properties.put(ConfigConstants.CONVERTER , serializer); + LOG.info("Creationg publisher for id {} with properties {}", id, properties); + producer = new KafkaProducer<>(properties); + this.id = id; + } + + @VisibleForTesting Publisher(String id, Producer producer) { + this.producer = producer; + this.id = id; + } + + public void close() { + producer.close(); + } + + public void sendEvent(NotificationService notificationService) { + LOG.info("SendEvent request to topic '{}' ", notificationService.getConnectionType().getName()); + producer.send(new ProducerRecord<>(notificationService.getConnectionType().getName(), id, notificationService)); + producer.flush(); + } +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/ConfigConstants.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/ConfigConstants.java new file mode 100644 index 000000000..b10b01646 --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/ConfigConstants.java @@ -0,0 +1,16 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.serialization; + +public final class ConfigConstants { + private ConfigConstants() { + } + + public static final String CONVERTER = "converter"; + +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java new file mode 100644 index 000000000..95f10fb63 --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java @@ -0,0 +1,67 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.serialization; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NotificationServiceDeserializer implements Deserializer { + private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceDeserializer.class); + private JsonStringConverter converter; + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + LOG.info("Deserializer configuration {}", configs); + if (configs.containsKey(ConfigConstants.CONVERTER) + && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter) { + converter = (JsonStringConverter) configs + .get(ConfigConstants.CONVERTER); + } + } + + @Override + public NotificationService deserialize(String topic, byte[] data) { + if (converter == null) { + throw new IllegalArgumentException( + "Converter should be configured through configure method of deserializer"); + } + String value = new String(data, StandardCharsets.UTF_8); + // The message published is + // org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService + // we have to map it to + // org.opendaylight.yang.gen + // .v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService + org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService mappedString = converter + .createDataObjectFromJsonString(YangInstanceIdentifier.of( + org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService.QNAME), + value, + JSONCodecFactorySupplier.RFC7951); + if (mappedString != null) { + LOG.info("Reading event {}", mappedString); + return new NotificationServiceBuilder().setCommonId(mappedString.getCommonId()) + .setConnectionType(mappedString.getConnectionType()).setMessage(mappedString.getMessage()) + .setOperationalState(mappedString.getOperationalState()) + .setResponseFailed(mappedString.getResponseFailed()).setServiceName(mappedString.getServiceName()) + .setServiceAEnd(mappedString.getServiceAEnd()).setServiceZEnd(mappedString.getServiceZEnd()) + .build(); + } + return null; + } + +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializer.java new file mode 100644 index 000000000..eb998746f --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializer.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.serialization; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NotificationServiceSerializer implements Serializer { + private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceSerializer.class); + private JsonStringConverter converter; + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + LOG.info("Deserializer configuration {}", configs); + if (configs.containsKey(ConfigConstants.CONVERTER) + && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter) { + converter = (JsonStringConverter) configs.get(ConfigConstants.CONVERTER); + } + } + + @Override + public byte[] serialize(String topic, NotificationService data) { + if (converter == null) { + throw new IllegalArgumentException( + "Converter should be" + "configured through configure method of serializer"); + } + if (data == null) { + return new byte[0]; + } + try { + InstanceIdentifier iid = InstanceIdentifier.builder(NotificationService.class).build(); + String serialized = converter.createJsonStringFromDataObject(iid, data, JSONCodecFactorySupplier.RFC7951); + LOG.info("Serialized event {}", serialized); + return serialized.getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + return new byte[0]; + } + } +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/NbiNotificationsUtils.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/NbiNotificationsUtils.java new file mode 100644 index 000000000..d1a4ceeca --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/NbiNotificationsUtils.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class NbiNotificationsUtils { + + private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsUtils.class); + + private NbiNotificationsUtils() { + } + + public static Properties loadProperties(String propertyFileName) { + Properties props = new Properties(); + InputStream inputStream = NbiNotificationsUtils.class.getClassLoader() + .getResourceAsStream(propertyFileName); + try { + if (inputStream != null) { + props.load(inputStream); + } else { + LOG.warn("Kafka property file '{}' is empty", propertyFileName); + } + } catch (IOException e) { + LOG.warn("Kafka property file '{}' was not found in the classpath", propertyFileName); + } + return props; + } +} diff --git a/nbinotifications/src/main/resources/OSGI-INF/blueprint/nobinotifications-blueprint.xml b/nbinotifications/src/main/resources/OSGI-INF/blueprint/nobinotifications-blueprint.xml new file mode 100644 index 000000000..9b3a3bd7f --- /dev/null +++ b/nbinotifications/src/main/resources/OSGI-INF/blueprint/nobinotifications-blueprint.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + PceListener + ServiceHandlerOperations + ServiceHandler + RendererListener + + + + + + + + + diff --git a/nbinotifications/src/main/resources/publisher.properties b/nbinotifications/src/main/resources/publisher.properties new file mode 100644 index 000000000..ecbc15631 --- /dev/null +++ b/nbinotifications/src/main/resources/publisher.properties @@ -0,0 +1,8 @@ +#Kafka Producer/AdminClient properties +bootstrap.servers=localhost:9092 +acks=all +retries=3 +max.in.flight.requests.per.connection=1 +batch.size=16384 +linger.ms=1 +buffer.memory=33554432 diff --git a/nbinotifications/src/main/resources/subscriber.properties b/nbinotifications/src/main/resources/subscriber.properties new file mode 100644 index 000000000..0c3e96ef7 --- /dev/null +++ b/nbinotifications/src/main/resources/subscriber.properties @@ -0,0 +1,5 @@ +#Kafka Consumer properties +bootstrap.servers=localhost:9092 +enable.auto.commit=true +auto.commit.interval.ms=1000 +auto.offset.reset=earliest diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java new file mode 100644 index 000000000..de58a498a --- /dev/null +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java @@ -0,0 +1,57 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils; +import org.opendaylight.transportpce.test.AbstractTest; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; + +public class SubscriberTest extends AbstractTest { + private static final String TOPIC = "topic"; + private static final int PARTITION = 0; + private MockConsumer mockConsumer; + private Subscriber subscriber; + + @Before + public void setUp() { + mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + subscriber = new Subscriber(mockConsumer); + } + + @Test + public void subscribeServiceShouldBeSuccessful() { + // from https://www.baeldung.com/kafka-mockconsumer + ConsumerRecord record = new ConsumerRecord( + TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedEvent()); + mockConsumer.schedulePollTask(() -> { + mockConsumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION))); + mockConsumer.addRecord(record); + }); + + Map startOffsets = new HashMap<>(); + TopicPartition tp = new TopicPartition(TOPIC, PARTITION); + startOffsets.put(tp, 0L); + mockConsumer.updateBeginningOffsets(startOffsets); + List result = subscriber.subscribeService(TOPIC); + assertEquals("There should be 1 record", 1, result.size()); + assertTrue("Consumer should be closed", mockConsumer.closed()); + } +} diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java new file mode 100644 index 000000000..da825509a --- /dev/null +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.impl; + +import static org.junit.Assert.assertNull; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.test.AbstractTest; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInputBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput; +import org.opendaylight.yangtools.yang.common.RpcResult; + +public class NbiNotificationsImplTest extends AbstractTest { + private NbiNotificationsImpl nbiNotificationsImpl; + + @Before + public void setUp() { + JsonStringConverter converter = new JsonStringConverter<>( + getDataStoreContextUtil().getBindingDOMCodecServices()); + nbiNotificationsImpl = new NbiNotificationsImpl(converter, "localhost:8080"); + } + + public void getNotificationsServiceEmptyDataTest() throws InterruptedException, ExecutionException { + ListenableFuture> result = + nbiNotificationsImpl.getNotificationsService(new GetNotificationsServiceInputBuilder().build()); + assertNull("Should be null", result.get().getResult().getNotificationService()); + } + + @Test + public void getNotificationsServiceTest() throws InterruptedException, ExecutionException { + GetNotificationsServiceInputBuilder builder = new GetNotificationsServiceInputBuilder(); + builder.setGroupId("groupId"); + builder.setIdConsumer("consumerId"); + builder.setConnectionType(ConnectionType.Service); + ListenableFuture> result = + nbiNotificationsImpl.getNotificationsService(builder.build()); + assertNull("Should be null", result.get().getResult().getNotificationService()); + } +} diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java new file mode 100644 index 000000000..178fc4243 --- /dev/null +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.mdsal.binding.api.NotificationService; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl; +import org.opendaylight.transportpce.test.AbstractTest; + +public class NbiNotificationsProviderTest extends AbstractTest { + + @Mock + RpcProviderService rpcProviderRegistry; + + @Mock + private NotificationService notificationService; + + @Before + public void init() { + MockitoAnnotations.openMocks(this); + + } + + @Test + public void initTest() { + NbiNotificationsProvider provider = new NbiNotificationsProvider( + Arrays.asList("topic1", "topic2"), "localhost:8080", "localhost:8080", + rpcProviderRegistry, notificationService, + getDataStoreContextUtil().getBindingDOMCodecServices()); + provider.init(); + verify(rpcProviderRegistry, times(1)) + .registerRpcImplementation(any(), any(NbiNotificationsImpl.class)); + verify(notificationService, times(1)) + .registerNotificationListener(any(NbiNotificationsListenerImpl.class)); + } +} diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java new file mode 100644 index 000000000..cb8faf242 --- /dev/null +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.listener; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.transportpce.nbinotifications.producer.Publisher; +import org.opendaylight.transportpce.test.AbstractTest; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder; + +public class NbiNotificationsListenerImplTest extends AbstractTest { + @Mock + private Publisher publisher; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + public void onPublishNotificationServiceTest() { + NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher)); + PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("test") + .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted") + .setOperationalState(State.OutOfService).setServiceName("service name").build(); + listener.onPublishNotificationService(notification); + verify(publisher, times(1)).sendEvent(any()); + } + + @Test + public void onPublishNotificationServiceWrongTopicTest() { + NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher)); + PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("wrongtopic") + .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted") + .setOperationalState(State.OutOfService).setServiceName("service name").build(); + listener.onPublishNotificationService(notification); + verify(publisher, times(0)).sendEvent(any()); + } +} diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java new file mode 100644 index 000000000..b3a5b2edb --- /dev/null +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.producer; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants; +import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer; +import org.opendaylight.transportpce.test.AbstractTest; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; + +public class PublisherTest extends AbstractTest { + private JsonStringConverter converter; + private Publisher publisher; + private MockProducer mockProducer; + + @Before + public void setUp() { + NotificationServiceSerializer serializer = new NotificationServiceSerializer(); + Map properties = Map.of(ConfigConstants.CONVERTER , serializer); + serializer.configure(properties, false); + mockProducer = new MockProducer<>(true, new StringSerializer(), serializer); + converter = new JsonStringConverter( + getDataStoreContextUtil().getBindingDOMCodecServices()); + publisher = new Publisher("test",mockProducer); + } + + @Test + public void sendEventShouldBeSuccessful() throws IOException { + String json = Files.readString(Paths.get("src/test/resources/event.json")); + NotificationService notificationService = converter + .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationService.QNAME), + json, JSONCodecFactorySupplier.RFC7951); + publisher.sendEvent(notificationService); + assertEquals("We should have one message", 1, mockProducer.history().size()); + assertEquals("Key should be test", "test",mockProducer.history().get(0).key()); + } + +} diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java new file mode 100644 index 000000000..928c2471a --- /dev/null +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.serialization; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import org.junit.Test; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.test.AbstractTest; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; + +public class NotificationServiceDeserializerTest extends AbstractTest { + + @Test + public void deserializeTest() throws IOException { + JsonStringConverter converter = + new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices()); + NotificationServiceDeserializer deserializer = new NotificationServiceDeserializer(); + Map configs = Map.of(ConfigConstants.CONVERTER, converter); + deserializer.configure(configs, false); + NotificationService readEvent = deserializer.deserialize("Test", + Files.readAllBytes(Paths.get("src/test/resources/event.json"))); + deserializer.close(); + assertEquals("Service name should be service1", "service1", readEvent.getServiceName()); + } +} diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java new file mode 100644 index 000000000..3fe658b6b --- /dev/null +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java @@ -0,0 +1,44 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.serialization; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import org.junit.Test; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.test.AbstractTest; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; + +public class NotificationServiceSerializerTest extends AbstractTest { + + @Test + public void serializeTest() throws IOException { + JsonStringConverter converter = + new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices()); + String json = Files.readString(Paths.get("src/test/resources/event.json")); + NotificationService notificationService = converter + .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationService.QNAME), + json, JSONCodecFactorySupplier.RFC7951); + NotificationServiceSerializer serializer = new NotificationServiceSerializer(); + Map configs = Map.of(ConfigConstants.CONVERTER, converter); + serializer.configure(configs, false); + byte[] data = serializer.serialize("test", notificationService); + serializer.close(); + assertNotNull("Serialized data should not be null", data); + String expectedJson = Files.readString(Paths.get("src/test/resources/expected_event.json")); + assertEquals("The event should be equals", expectedJson, new String(data, StandardCharsets.UTF_8)); + } +} diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java new file mode 100644 index 000000000..44b695888 --- /dev/null +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java @@ -0,0 +1,98 @@ +/* + * Copyright © 2020 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.utils; + +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.node.types.rev181130.NodeIdType; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.RxDirection; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirection; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.lgx.LgxBuilder; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.PortBuilder; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State; +import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder; +import org.opendaylight.yangtools.yang.common.Uint32; + +public final class NotificationServiceDataUtils { + + private NotificationServiceDataUtils() { + } + + public static NotificationService buildSendEventInput() { + NotificationServiceBuilder notificationServiceBuilder = new NotificationServiceBuilder() + .setMessage("message") + .setServiceName("service1") + .setOperationalState(State.InService) + .setResponseFailed("") + .setCommonId("commond-id") + .setConnectionType(ConnectionType.Service) + .setServiceZEnd(getServiceZEndBuild().build()) + .setServiceAEnd(getServiceAEndBuild().build()); + + return notificationServiceBuilder.build(); + } + + public static org.opendaylight.yang.gen.v1 + .nbi.notifications.rev201130.get.notifications.service.output.NotificationService buildReceivedEvent() { + org.opendaylight.yang.gen.v1 + .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder + notificationServiceBuilder = new org.opendaylight.yang.gen.v1 + .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder() + .setMessage("message") + .setServiceName("service1") + .setOperationalState(State.InService) + .setResponseFailed("") + .setCommonId("commond-id") + .setConnectionType(ConnectionType.Service) + .setServiceZEnd(getServiceZEndBuild().build()) + .setServiceAEnd(getServiceAEndBuild().build()); + + return notificationServiceBuilder.build(); + } + + public static ServiceAEndBuilder getServiceAEndBuild() { + return new ServiceAEndBuilder() + .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1)) + .setNodeId(new NodeIdType("XPONDER-1-2")) + .setTxDirection(getTxDirection()) + .setRxDirection(getRxDirection()); + } + + public static ServiceZEndBuilder getServiceZEndBuild() { + return new ServiceZEndBuilder() + .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1)) + .setNodeId(new NodeIdType("XPONDER-1-2")) + .setTxDirection(getTxDirection()) + .setRxDirection(getRxDirection()); + } + + private static TxDirection getTxDirection() { + return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service + .endpoint.TxDirectionBuilder().setPort(new PortBuilder().setPortDeviceName("device name") + .setPortName("port name").setPortRack("port rack").setPortShelf("port shelf") + .setPortSlot("port slot").setPortSubSlot("port subslot").setPortType("port type").build()) + .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name").setLgxPortName("lgx port name") + .setLgxPortRack("lgx port rack").setLgxPortShelf("lgx port shelf").build()) + .build(); + } + + private static RxDirection getRxDirection() { + return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service + .endpoint.RxDirectionBuilder() + .setPort(new PortBuilder().setPortDeviceName("device name").setPortName("port name") + .setPortRack("port rack").setPortShelf("port shelf").setPortSlot("port slot") + .setPortSubSlot("port subslot").setPortType("port type").build()) + .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name") + .setLgxPortName("lgx port name").setLgxPortRack("lgx port rack") + .setLgxPortShelf("lgx port shelf").build()) + .build(); + } +} diff --git a/nbinotifications/src/test/resources/event.json b/nbinotifications/src/test/resources/event.json new file mode 100644 index 000000000..63c7bec73 --- /dev/null +++ b/nbinotifications/src/test/resources/event.json @@ -0,0 +1,90 @@ +{ + "notification-service": { + "service-name": "service1", + "service-a-end": { + "service-format": "OC", + "node-id": "XPONDER-1-2", + "service-rate": 1, + "clli": "clli", + "tx-direction": { + "port": { + "port-slot": "port slot", + "port-device-name": "device name", + "port-rack": "port rack", + "port-shelf": "port shelf", + "port-type": "port type", + "port-sub-slot": "port subslot", + "port-name": "port name" + }, + "lgx": { + "lgx-port-name": "lgx port name", + "lgx-port-shelf": "lgx port shelf", + "lgx-port-rack": "lgx port rack", + "lgx-device-name": "lgx device name" + } + }, + "rx-direction": { + "port": { + "port-slot": "port slot", + "port-device-name": "device name", + "port-rack": "port rack", + "port-shelf": "port shelf", + "port-type": "port type", + "port-sub-slot": "port subslot", + "port-name": "port name" + }, + "lgx": { + "lgx-port-name": "lgx port name", + "lgx-port-shelf": "lgx port shelf", + "lgx-port-rack": "lgx port rack", + "lgx-device-name": "lgx device name" + } + } + }, + "common-id": "commond-id", + "operational-state": "inService", + "connection-type": "service", + "service-z-end": { + "service-format": "OC", + "node-id": "XPONDER-1-2", + "service-rate": 1, + "clli": "clli", + "tx-direction": { + "port": { + "port-slot": "port slot", + "port-device-name": "device name", + "port-rack": "port rack", + "port-shelf": "port shelf", + "port-type": "port type", + "port-sub-slot": "port subslot", + "port-name": "port name" + }, + "lgx": { + "lgx-port-name": "lgx port name", + "lgx-port-shelf": "lgx port shelf", + "lgx-port-rack": "lgx port rack", + "lgx-device-name": "lgx device name" + } + }, + "rx-direction": { + "port": { + "port-slot": "port slot", + "port-device-name": "device name", + "port-rack": "port rack", + "port-shelf": "port shelf", + "port-type": "port type", + "port-sub-slot": "port subslot", + "port-name": "port name" + }, + "lgx": { + "lgx-port-name": "lgx port name", + "lgx-port-shelf": "lgx port shelf", + "lgx-port-rack": "lgx port rack", + "lgx-device-name": "lgx device name" + } + } + }, + "message": "message", + "response-failed": "" + } +} diff --git a/nbinotifications/src/test/resources/expected_event.json b/nbinotifications/src/test/resources/expected_event.json new file mode 100644 index 000000000..2b8924df3 --- /dev/null +++ b/nbinotifications/src/test/resources/expected_event.json @@ -0,0 +1 @@ +{"notification-service":{"service-name":"service1","service-a-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"common-id":"commond-id","operational-state":"inService","connection-type":"service","service-z-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"message":"message","response-failed":""}} \ No newline at end of file diff --git a/nbinotifications/src/test/resources/publisher.properties b/nbinotifications/src/test/resources/publisher.properties new file mode 100644 index 000000000..ecbc15631 --- /dev/null +++ b/nbinotifications/src/test/resources/publisher.properties @@ -0,0 +1,8 @@ +#Kafka Producer/AdminClient properties +bootstrap.servers=localhost:9092 +acks=all +retries=3 +max.in.flight.requests.per.connection=1 +batch.size=16384 +linger.ms=1 +buffer.memory=33554432 diff --git a/nbinotifications/src/test/resources/subscriber.properties b/nbinotifications/src/test/resources/subscriber.properties new file mode 100644 index 000000000..0c3e96ef7 --- /dev/null +++ b/nbinotifications/src/test/resources/subscriber.properties @@ -0,0 +1,5 @@ +#Kafka Consumer properties +bootstrap.servers=localhost:9092 +enable.auto.commit=true +auto.commit.interval.ms=1000 +auto.offset.reset=earliest diff --git a/pom.xml b/pom.xml index 7d26a8434..f8f8eea91 100644 --- a/pom.xml +++ b/pom.xml @@ -42,6 +42,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL pce servicehandler tapi + nbinotifications features karaf -- 2.36.6