Refactor NBINotification & add ServiceListener tests 72/96672/5
authorThierry Jiao <thierry.jiao@orange.com>
Thu, 24 Jun 2021 15:01:59 +0000 (17:01 +0200)
committerThierry Jiao <thierry.jiao@orange.com>
Tue, 3 Aug 2021 15:06:58 +0000 (17:06 +0200)
- Refactor Subscriber and Publisher from nbinotifications
- Update NBINotifications unit tests
- Add new unit test for ServiceListener
- Update func test 'test_nbinotifications.py'

JIRA: TRNSPRTPCE-471
Signed-off-by: Thierry Jiao <thierry.jiao@orange.com>
Change-Id: Ie1161740432f12176acfb8b31658abc4a1190f62

19 files changed:
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java [deleted file]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java [deleted file]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializerTest.java [new file with mode: 0755]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializerTest.java [new file with mode: 0755]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java
nbinotifications/src/test/resources/event_alarm_service.json [new file with mode: 0755]
nbinotifications/src/test/resources/expected_event_alarm_service.json [new file with mode: 0755]
servicehandler/src/test/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListenerTest.java [new file with mode: 0755]
tests/transportpce_tests/2.2.1/test_nbinotifications.py
tests/transportpce_tests/common/test_utils.py

index 735c6a8d25a2de0edb64b3790716300a72f2d7a3..7ebc0bdf620cb09a11bc9e3e31fe691298374f5a 100644 (file)
@@ -19,43 +19,43 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
 import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Subscriber {
+public class Subscriber<T extends DataObject, D> {
     private static final Logger LOG = LoggerFactory.getLogger(Subscriber.class);
 
-    private final Consumer<String, NotificationService> consumer;
+    private final Consumer<String, D> consumer;
 
-    public Subscriber(String id, String groupId, String suscriberServer,
-            JsonStringConverter<org.opendaylight.yang.gen.v1
-                .nbi.notifications.rev210628.NotificationService> deserializer) {
+    public Subscriber(String id, String groupId, String subscriberServer, JsonStringConverter<T> deserializer,
+                      Class<?> deserializerConf) {
         Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
         propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
         propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationServiceDeserializer.class);
+        propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf);
         propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
-        if (suscriberServer != null && !suscriberServer.isBlank()) {
-            propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, suscriberServer);
+        if (subscriberServer != null && !subscriberServer.isBlank()) {
+            propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer);
         }
-        LOG.info("Suscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
+        LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
         consumer = new KafkaConsumer<>(propsConsumer);
     }
 
-    public List<NotificationService> subscribeService(String topicName) {
+    public List<D> subscribe(String topicName, @NonNull QName name) {
         LOG.info("Subscribe request to topic '{}' ", topicName);
         consumer.subscribe(Collections.singleton(topicName));
-        final ConsumerRecords<String, NotificationService> consumerRecords = consumer.poll(Duration.ofMillis(1000));
-        List<NotificationService> notificationServiceList = new ArrayList<>();
-        YangInstanceIdentifier.of(NotificationService.QNAME);
-        for (ConsumerRecord<String, NotificationService> record : consumerRecords) {
+        final ConsumerRecords<String, D> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+        List<D> notificationServiceList = new ArrayList<>();
+        YangInstanceIdentifier.of(name);
+        for (ConsumerRecord<String, D> record : consumerRecords) {
             if (record.value() != null) {
                 notificationServiceList.add(record.value());
             }
@@ -66,7 +66,7 @@ public class Subscriber {
         return notificationServiceList;
     }
 
-    @VisibleForTesting public Subscriber(Consumer<String, NotificationService> consumer) {
+    @VisibleForTesting public Subscriber(Consumer<String, D> consumer) {
         this.consumer = consumer;
     }
 }
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java
deleted file mode 100644 (file)
index f3bf9df..0000000
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.transportpce.nbinotifications.consumer;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.opendaylight.transportpce.common.converter.JsonStringConverter;
-import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
-import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SubscriberAlarm {
-    private static final Logger LOG = LoggerFactory.getLogger(SubscriberAlarm.class);
-
-    private final Consumer<String, NotificationAlarmService> consumer;
-
-    public SubscriberAlarm(String id, String groupId, String subscriberServer,
-                           JsonStringConverter<org.opendaylight.yang.gen.v1
-                .nbi.notifications.rev210628.NotificationAlarmService> deserializer) {
-        Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
-        propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
-        propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationAlarmServiceDeserializer.class);
-        propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
-        if (subscriberServer != null && !subscriberServer.isBlank()) {
-            propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer);
-        }
-        LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
-        consumer = new KafkaConsumer<>(propsConsumer);
-    }
-
-    public List<NotificationAlarmService> subscribeAlarm(String topicName) {
-        LOG.info("Subscribe request to topic '{}' ", topicName);
-        consumer.subscribe(Collections.singleton(topicName));
-        final ConsumerRecords<String, NotificationAlarmService> consumerRecords = consumer
-                .poll(Duration.ofMillis(1000));
-        List<NotificationAlarmService> notificationAlarmServiceList = new ArrayList<>();
-        YangInstanceIdentifier.of(NotificationAlarmService.QNAME);
-        for (ConsumerRecord<String, NotificationAlarmService> record : consumerRecords) {
-            if (record.value() != null) {
-                notificationAlarmServiceList.add(record.value());
-            }
-        }
-        LOG.info("Getting records '{}' ", notificationAlarmServiceList);
-        consumer.unsubscribe();
-        consumer.close();
-        return notificationAlarmServiceList;
-    }
-
-    @VisibleForTesting public SubscriberAlarm(Consumer<String, NotificationAlarmService> consumer) {
-        this.consumer = consumer;
-    }
-}
index d7201d87ad63fc627f19d1dc88f0a678af5aca1b..60fd5517f71edbe1ce4fd958a665cc868b30f52e 100644 (file)
@@ -11,7 +11,8 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
-import org.opendaylight.transportpce.nbinotifications.consumer.SubscriberAlarm;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceInput;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutput;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutputBuilder;
@@ -51,9 +52,11 @@ public class NbiNotificationsImpl implements NbiNotificationsService {
             LOG.warn("Missing mandatory params for input {}", input);
             return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture();
         }
-        Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converterService);
+        Subscriber<org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService,
+                NotificationService> subscriber = new Subscriber<>(input.getIdConsumer(), input.getGroupId(), server,
+                converterService, NotificationServiceDeserializer.class);
         List<NotificationService> notificationServiceList = subscriber
-                .subscribeService(input.getConnectionType().getName());
+                .subscribe(input.getConnectionType().getName(), NotificationService.QNAME);
         GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder()
                 .setNotificationService(notificationServiceList);
         return RpcResultBuilder.success(output.build()).buildFuture();
@@ -67,10 +70,11 @@ public class NbiNotificationsImpl implements NbiNotificationsService {
             LOG.warn("Missing mandatory params for input {}", input);
             return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture();
         }
-        SubscriberAlarm subscriberAlarm = new SubscriberAlarm(input.getIdConsumer(), input.getGroupId(), server,
-                converterAlarmService);
-        List<NotificationAlarmService> notificationAlarmServiceList = subscriberAlarm
-                .subscribeAlarm("alarm" + input.getConnectionType().getName());
+        Subscriber<org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService,
+                NotificationAlarmService> subscriber = new Subscriber<>(input.getIdConsumer(), input.getGroupId(),
+                server, converterAlarmService, NotificationAlarmServiceDeserializer.class);
+        List<NotificationAlarmService> notificationAlarmServiceList = subscriber
+                .subscribe("alarm" + input.getConnectionType().getName(), NotificationAlarmService.QNAME);
         GetNotificationsAlarmServiceOutputBuilder output = new GetNotificationsAlarmServiceOutputBuilder()
                 .setNotificationAlarmService(notificationAlarmServiceList);
         return RpcResultBuilder.success(output.build()).buildFuture();
index 3278416536026cfa21170a572918f6a7c432ad87..0d578de956fa8d4bad3fce7e837f0dec73fc9e73 100644 (file)
@@ -16,9 +16,11 @@ import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
 import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.ObjectRegistration;
 import org.slf4j.Logger;
@@ -27,8 +29,9 @@ import org.slf4j.LoggerFactory;
 public class NbiNotificationsProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class);
-    private static Map<String, Publisher> publishersServiceMap =  new HashMap<>();
-    private static Map<String, PublisherAlarm> publishersAlarmMap =  new HashMap<>();
+    private static Map<String, Publisher<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev210628.NotificationService>> publishersServiceMap =  new HashMap<>();
+    private static Map<String, Publisher<NotificationAlarmService>> publishersAlarmMap =  new HashMap<>();
 
     private final RpcProviderService rpcService;
     private ObjectRegistration<NbiNotificationsService> rpcRegistration;
@@ -36,8 +39,7 @@ public class NbiNotificationsProvider {
     private NotificationService notificationService;
     private final JsonStringConverter<org.opendaylight.yang.gen.v1
         .nbi.notifications.rev210628.NotificationService> converterService;
-    private final JsonStringConverter<org.opendaylight.yang.gen.v1
-            .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService;
+    private final JsonStringConverter<NotificationAlarmService> converterAlarmService;
     private final String subscriberServer;
 
 
@@ -50,12 +52,14 @@ public class NbiNotificationsProvider {
         converterService =  new JsonStringConverter<>(bindingDOMCodecServices);
         for (String topic: topicsService) {
             LOG.info("Creating publisher for topic {}", topic);
-            publishersServiceMap.put(topic, new Publisher(topic, publisherServer, converterService));
+            publishersServiceMap.put(topic, new Publisher<>(topic, publisherServer, converterService,
+                    NotificationServiceSerializer.class));
         }
         converterAlarmService = new JsonStringConverter<>(bindingDOMCodecServices);
         for (String topic: topicsAlarm) {
             LOG.info("Creating publisher for topic {}", topic);
-            publishersAlarmMap.put(topic, new PublisherAlarm(topic, publisherServer, converterAlarmService));
+            publishersAlarmMap.put(topic, new Publisher<>(topic, publisherServer, converterAlarmService,
+                    NotificationAlarmServiceSerializer.class));
         }
         this.subscriberServer = subscriberServer;
     }
@@ -75,10 +79,11 @@ public class NbiNotificationsProvider {
      * Method called when the blueprint container is destroyed.
      */
     public void close() {
-        for (Publisher publisher : publishersServiceMap.values()) {
+        for (Publisher<org.opendaylight.yang.gen.v1
+                .nbi.notifications.rev210628.NotificationService> publisher : publishersServiceMap.values()) {
             publisher.close();
         }
-        for (PublisherAlarm publisherAlarm : publishersAlarmMap.values()) {
+        for (Publisher<NotificationAlarmService> publisherAlarm : publishersAlarmMap.values()) {
             publisherAlarm.close();
         }
         rpcRegistration.close();
index 2e8427f641a12b34f7d315404f0d21ebfdb61a44..e72b2282a82f26827719ba0583167d6a8926b8cc 100644 (file)
@@ -9,9 +9,10 @@ package org.opendaylight.transportpce.nbinotifications.listener;
 
 import java.util.Map;
 import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationServiceBuilder;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
@@ -20,11 +21,11 @@ import org.slf4j.LoggerFactory;
 
 public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
     private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsListenerImpl.class);
-    private Map<String, Publisher> publishersServiceMap;
-    private Map<String, PublisherAlarm> publishersAlarmMap;
+    private final Map<String, Publisher<NotificationService>> publishersServiceMap;
+    private final Map<String, Publisher<NotificationAlarmService>> publishersAlarmMap;
 
-    public NbiNotificationsListenerImpl(Map<String, Publisher> publishersServiceMap,
-                                        Map<String, PublisherAlarm> publishersAlarmMap) {
+    public NbiNotificationsListenerImpl(Map<String, Publisher<NotificationService>> publishersServiceMap,
+                                        Map<String, Publisher<NotificationAlarmService>> publishersAlarmMap) {
         this.publishersServiceMap = publishersServiceMap;
         this.publishersAlarmMap = publishersAlarmMap;
     }
@@ -37,14 +38,14 @@ public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
             LOG.error("Unknown topic {}", topic);
             return;
         }
-        Publisher publisher = publishersServiceMap.get(topic);
+        Publisher<NotificationService> publisher = publishersServiceMap.get(topic);
         publisher.sendEvent(new NotificationServiceBuilder().setCommonId(notification.getCommonId())
                 .setConnectionType(notification.getConnectionType()).setMessage(notification.getMessage())
                 .setOperationalState(notification.getOperationalState())
                 .setResponseFailed(notification.getResponseFailed())
                 .setServiceAEnd(notification.getServiceAEnd())
                 .setServiceName(notification.getServiceName())
-                .setServiceZEnd(notification.getServiceZEnd()).build());
+                .setServiceZEnd(notification.getServiceZEnd()).build(), notification.getConnectionType().getName());
     }
 
     @Override
@@ -55,12 +56,12 @@ public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
             LOG.error("Unknown topic {}", topic);
             return;
         }
-        PublisherAlarm publisherAlarm = publishersAlarmMap.get(topic);
+        Publisher<NotificationAlarmService> publisherAlarm = publishersAlarmMap.get(topic);
         publisherAlarm.sendEvent(new NotificationAlarmServiceBuilder().setConnectionType(notification
                 .getConnectionType())
                 .setMessage(notification.getMessage())
                 .setOperationalState(notification.getOperationalState())
                 .setServiceName(notification.getServiceName())
-                .build());
+                .build(), "alarm" + notification.getConnectionType().getName());
     }
 }
index 664a7c9a0867ad50ba07357202d877f54b57b62b..9f1479ab86a8e4b1082e23e9fd5b6d18c4fdbf07 100644 (file)
@@ -16,33 +16,32 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
 import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Publisher {
+public class Publisher<T extends DataObject> {
     private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
 
     private final String id;
-    private final Producer<String, NotificationService> producer;
+    private final Producer<String, T> producer;
 
-    public Publisher(String id, String publisherServer, JsonStringConverter<NotificationService> serializer) {
+    public Publisher(String id, String publisherServer, JsonStringConverter<T> serializer, Class<?> serializerConf) {
         Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
         properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
         if (publisherServer != null && !publisherServer.isBlank()) {
             properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
         }
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationServiceSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , serializerConf);
         properties.put(ConfigConstants.CONVERTER , serializer);
-        LOG.info("Creationg publisher for id {} with properties {}", id, properties);
+        LOG.info("Creation publisher for id {} with properties {}", id, properties);
         producer = new KafkaProducer<>(properties);
         this.id = id;
     }
 
-    @VisibleForTesting Publisher(String id, Producer<String, NotificationService> producer) {
+    @VisibleForTesting Publisher(String id, Producer<String, T> producer) {
         this.producer = producer;
         this.id = id;
     }
@@ -51,9 +50,9 @@ public class Publisher {
         producer.close();
     }
 
-    public void sendEvent(NotificationService notificationService) {
-        LOG.info("SendEvent request to topic '{}' ", notificationService.getConnectionType().getName());
-        producer.send(new ProducerRecord<>(notificationService.getConnectionType().getName(), id, notificationService));
+    public void sendEvent(T notification, String topic) {
+        LOG.info("SendEvent request to topic '{}' ", topic);
+        producer.send(new ProducerRecord<>(topic, id, notification));
         producer.flush();
     }
 }
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java
deleted file mode 100644 (file)
index b275548..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.transportpce.nbinotifications.producer;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Properties;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.opendaylight.transportpce.common.converter.JsonStringConverter;
-import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer;
-import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PublisherAlarm {
-    private static final Logger LOG = LoggerFactory.getLogger(PublisherAlarm.class);
-
-    private final String id;
-    private final Producer<String, NotificationAlarmService> producer;
-
-    public PublisherAlarm(String id, String publisherServer, JsonStringConverter<NotificationAlarmService> serializer) {
-        Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
-        properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
-        if (publisherServer != null && !publisherServer.isBlank()) {
-            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
-        }
-        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationAlarmServiceSerializer.class);
-        properties.put(ConfigConstants.CONVERTER , serializer);
-        LOG.info("Creationg publisher for id {} with properties {}", id, properties);
-        producer = new KafkaProducer<>(properties);
-        this.id = id;
-    }
-
-    @VisibleForTesting
-    PublisherAlarm(String id, Producer<String, NotificationAlarmService> producer) {
-        this.producer = producer;
-        this.id = id;
-    }
-
-    public void close() {
-        producer.close();
-    }
-
-    public void sendEvent(NotificationAlarmService notificationAlarmService) {
-        LOG.info("SendEvent request to topic '{}' ", notificationAlarmService.getConnectionType().getName());
-        producer.send(new ProducerRecord<>("alarm" + notificationAlarmService.getConnectionType().getName(),
-                id, notificationAlarmService));
-        producer.flush();
-    }
-}
index 5427020e1622403646cf2879fa9eacec29b9e81d..168f43d257219429645cddad19547240c243b668 100644 (file)
@@ -22,24 +22,31 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils;
 import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
 
 public class SubscriberTest extends AbstractTest {
     private static final String TOPIC = "topic";
     private static final int PARTITION = 0;
     private MockConsumer<String, NotificationService> mockConsumer;
-    private Subscriber subscriber;
+    private MockConsumer<String, NotificationAlarmService> mockConsumerAlarm;
+    private Subscriber<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev210628.NotificationService, NotificationService> subscriberService;
+    private Subscriber<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev210628.NotificationAlarmService, NotificationAlarmService> subscriberAlarmService;
 
     @Before
     public void setUp() {
         mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        subscriber = new Subscriber(mockConsumer);
+        mockConsumerAlarm = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        subscriberService = new Subscriber<>(mockConsumer);
+        subscriberAlarmService = new Subscriber<>(mockConsumerAlarm);
     }
 
     @Test
     public void subscribeServiceShouldBeSuccessful() {
         // from https://www.baeldung.com/kafka-mockconsumer
-        ConsumerRecord<String, NotificationService> record = new ConsumerRecord<String, NotificationService>(
+        ConsumerRecord<String, NotificationService> record = new ConsumerRecord<>(
                 TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedEvent());
         mockConsumer.schedulePollTask(() -> {
             mockConsumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
@@ -50,8 +57,27 @@ public class SubscriberTest extends AbstractTest {
         TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
         startOffsets.put(tp, 0L);
         mockConsumer.updateBeginningOffsets(startOffsets);
-        List<NotificationService> result = subscriber.subscribeService(TOPIC);
+        List<NotificationService> result = subscriberService.subscribe(TOPIC, NotificationService.QNAME);
         assertEquals("There should be 1 record", 1, result.size());
         assertTrue("Consumer should be closed", mockConsumer.closed());
     }
+
+    @Test
+    public void subscribeAlarmShouldBeSuccessful() {
+        // from https://www.baeldung.com/kafka-mockconsumer
+        ConsumerRecord<String, NotificationAlarmService> record = new ConsumerRecord<>(
+                TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedAlarmEvent());
+        mockConsumerAlarm.schedulePollTask(() -> {
+            mockConsumerAlarm.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
+            mockConsumerAlarm.addRecord(record);
+        });
+
+        Map<TopicPartition, Long> startOffsets = new HashMap<>();
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        startOffsets.put(tp, 0L);
+        mockConsumerAlarm.updateBeginningOffsets(startOffsets);
+        List<NotificationAlarmService> result = subscriberAlarmService.subscribe(TOPIC, NotificationAlarmService.QNAME);
+        assertEquals("There should be 1 record", 1, result.size());
+        assertTrue("Consumer should be closed", mockConsumerAlarm.closed());
+    }
 }
index 9de03934c29522d7bc7a8c5fd9b8cf941cf81718..c121eb75a28c852bb7a4cd25420d353965700d8f 100644 (file)
@@ -16,6 +16,8 @@ import org.junit.Test;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.test.AbstractTest;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceInputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutput;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceInputBuilder;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutput;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -34,6 +36,7 @@ public class NbiNotificationsImplTest extends AbstractTest {
         nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm,"localhost:8080");
     }
 
+    @Test
     public void getNotificationsServiceEmptyDataTest() throws InterruptedException, ExecutionException {
         ListenableFuture<RpcResult<GetNotificationsServiceOutput>> result =
                 nbiNotificationsImpl.getNotificationsService(new GetNotificationsServiceInputBuilder().build());
@@ -42,12 +45,23 @@ public class NbiNotificationsImplTest extends AbstractTest {
 
     @Test
     public void getNotificationsServiceTest() throws InterruptedException, ExecutionException {
-        GetNotificationsServiceInputBuilder builder = new GetNotificationsServiceInputBuilder();
-        builder.setGroupId("groupId");
-        builder.setIdConsumer("consumerId");
-        builder.setConnectionType(ConnectionType.Service);
+        GetNotificationsServiceInputBuilder builder = new GetNotificationsServiceInputBuilder()
+                .setGroupId("groupId")
+                .setIdConsumer("consumerId")
+                .setConnectionType(ConnectionType.Service);
         ListenableFuture<RpcResult<GetNotificationsServiceOutput>> result =
                 nbiNotificationsImpl.getNotificationsService(builder.build());
         assertNull("Should be null", result.get().getResult().getNotificationService());
     }
+
+    @Test
+    public void getNotificationsAlarmServiceTest() throws InterruptedException, ExecutionException {
+        GetNotificationsAlarmServiceInputBuilder builder = new GetNotificationsAlarmServiceInputBuilder()
+                .setGroupId("groupId")
+                .setIdConsumer("consumerId")
+                .setConnectionType(ConnectionType.Service);
+        ListenableFuture<RpcResult<GetNotificationsAlarmServiceOutput>> result =
+                nbiNotificationsImpl.getNotificationsAlarmService(builder.build());
+        assertNull("Should be null", result.get().getResult().getNotificationAlarmService());
+    }
 }
index 899baed01718849435cfda050030ae6bb104398d..d8e832ead06f4a2d1cb8d6f81939a12ab8fddb11 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.transportpce.nbinotifications.listener;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -17,18 +18,21 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
 import org.opendaylight.transportpce.test.AbstractTest;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmServiceBuilder;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
 
 public class NbiNotificationsListenerImplTest extends AbstractTest {
     @Mock
-    private Publisher publisher;
+    private Publisher<NotificationService> publisherService;
     @Mock
-    private PublisherAlarm publisherAlarm;
+    private Publisher<NotificationAlarmService> publisherAlarm;
 
     @Before
     public void setUp() {
@@ -37,23 +41,46 @@ public class NbiNotificationsListenerImplTest extends AbstractTest {
 
     @Test
     public void onPublishNotificationServiceTest() {
-        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher),
+        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisherService),
                 Map.of("test", publisherAlarm));
         PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("test")
                 .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
                 .setOperationalState(State.OutOfService).setServiceName("service name").build();
         listener.onPublishNotificationService(notification);
-        verify(publisher, times(1)).sendEvent(any());
+        verify(publisherService, times(1)).sendEvent(any(), anyString());
     }
 
     @Test
     public void onPublishNotificationServiceWrongTopicTest() {
-        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher),
+        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisherService),
                 Map.of("test", publisherAlarm));
         PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("wrongtopic")
                 .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
                 .setOperationalState(State.OutOfService).setServiceName("service name").build();
         listener.onPublishNotificationService(notification);
-        verify(publisher, times(0)).sendEvent(any());
+        verify(publisherService, times(0)).sendEvent(any(), anyString());
+    }
+
+    @Test
+    public void onPublishNotificationAlarmServiceTest() {
+        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisherService),
+                Map.of("test", publisherAlarm));
+        PublishNotificationAlarmService notification = new PublishNotificationAlarmServiceBuilder().setTopic("test")
+                .setConnectionType(ConnectionType.Service).setMessage("The service is now inService")
+                .setOperationalState(State.OutOfService).setServiceName("service name").build();
+        listener.onPublishNotificationAlarmService(notification);
+        verify(publisherAlarm, times(1)).sendEvent(any(), anyString());
+    }
+
+    @Test
+    public void onPublishNotificationAlarmServiceWrongTopicTest() {
+        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisherService),
+                Map.of("test", publisherAlarm));
+        PublishNotificationAlarmService notification = new PublishNotificationAlarmServiceBuilder()
+                .setTopic("wrongtopic").setConnectionType(ConnectionType.Service)
+                .setMessage("The service is now inService").setOperationalState(State.OutOfService)
+                .setServiceName("service name").build();
+        listener.onPublishNotificationAlarmService(notification);
+        verify(publisherAlarm, times(0)).sendEvent(any(), anyString());
     }
 }
index f738fd71d83569790b5260e8f624795d7564632b..7218a222cbc854a0d522afe307792625fe84f61e 100644 (file)
@@ -19,37 +19,58 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer;
 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
 import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
 import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
 
 public class PublisherTest extends AbstractTest {
-    private JsonStringConverter<NotificationService> converter;
-    private Publisher publisher;
+    private JsonStringConverter<NotificationService> converterService;
+    private JsonStringConverter<NotificationAlarmService> converterAlarm;
+    private Publisher<NotificationService> publisherService;
+    private Publisher<NotificationAlarmService> publisherAlarm;
     private MockProducer<String, NotificationService> mockProducer;
+    private MockProducer<String, NotificationAlarmService> mockAlarmProducer;
 
     @Before
     public void setUp() {
-        NotificationServiceSerializer serializer = new NotificationServiceSerializer();
-        Map<String, Object> properties = Map.of(ConfigConstants.CONVERTER , serializer);
-        serializer.configure(properties, false);
-        mockProducer =  new MockProducer<>(true, new StringSerializer(), serializer);
-        converter = new JsonStringConverter<NotificationService>(
-                getDataStoreContextUtil().getBindingDOMCodecServices());
-        publisher = new Publisher("test",mockProducer);
+        NotificationServiceSerializer serializerService = new NotificationServiceSerializer();
+        NotificationAlarmServiceSerializer serializerAlarm = new NotificationAlarmServiceSerializer();
+        Map<String, Object> properties = Map.of(ConfigConstants.CONVERTER, serializerService);
+        Map<String, Object> propertiesAlarm = Map.of(ConfigConstants.CONVERTER, serializerAlarm);
+        serializerService.configure(properties, false);
+        serializerAlarm.configure(propertiesAlarm, false);
+        mockProducer = new MockProducer<>(true, new StringSerializer(), serializerService);
+        mockAlarmProducer = new MockProducer<>(true, new StringSerializer(), serializerAlarm);
+        converterService = new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+        converterAlarm = new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+        publisherService = new Publisher<>("test", mockProducer);
+        publisherAlarm = new Publisher<>("test", mockAlarmProducer);
     }
 
     @Test
-    public void sendEventShouldBeSuccessful() throws IOException {
+    public void sendEventServiceShouldBeSuccessful() throws IOException {
         String json = Files.readString(Paths.get("src/test/resources/event.json"));
-        NotificationService notificationService = converter
+        NotificationService notificationService = converterService
                 .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationService.QNAME),
                         json, JSONCodecFactorySupplier.RFC7951);
-        publisher.sendEvent(notificationService);
+        publisherService.sendEvent(notificationService, notificationService.getConnectionType().name());
         assertEquals("We should have one message", 1, mockProducer.history().size());
-        assertEquals("Key should be test", "test",mockProducer.history().get(0).key());
+        assertEquals("Key should be test", "test", mockProducer.history().get(0).key());
     }
 
+    @Test
+    public void sendEventAlarmShouldBeSuccessful() throws IOException {
+        String json = Files.readString(Paths.get("src/test/resources/event_alarm_service.json"));
+        NotificationAlarmService notificationAlarmService = converterAlarm
+                .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationAlarmService.QNAME),
+                        json, JSONCodecFactorySupplier.RFC7951);
+        publisherAlarm.sendEvent(notificationAlarmService, "alarm"
+                + notificationAlarmService.getConnectionType().getName());
+        assertEquals("We should have one message", 1, mockAlarmProducer.history().size());
+        assertEquals("Key should be test", "test", mockAlarmProducer.history().get(0).key());
+    }
 }
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializerTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializerTest.java
new file mode 100755 (executable)
index 0000000..2998b25
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
+
+public class NotificationAlarmServiceDeserializerTest extends AbstractTest {
+
+    @Test
+    public void deserializeTest() throws IOException {
+        JsonStringConverter<org.opendaylight.yang.gen.v1
+                .nbi.notifications.rev210628.NotificationAlarmService> converter =
+                new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+        NotificationAlarmServiceDeserializer deserializer = new NotificationAlarmServiceDeserializer();
+        Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
+        deserializer.configure(configs, false);
+        NotificationAlarmService readEvent = deserializer.deserialize("Test",
+                Files.readAllBytes(Paths.get("src/test/resources/event_alarm_service.json")));
+        deserializer.close();
+        assertEquals("Service name should be service1", "service1", readEvent.getServiceName());
+        assertEquals("message should be The service is now inService", "The service is now inService",
+                readEvent.getMessage());
+    }
+}
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializerTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializerTest.java
new file mode 100755 (executable)
index 0000000..ed276cb
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+
+public class NotificationAlarmServiceSerializerTest extends AbstractTest {
+
+    @Test
+    public void serializeTest() throws IOException {
+        JsonStringConverter<NotificationAlarmService> converter =
+                new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+        String json = Files.readString(Paths.get("src/test/resources/event_alarm_service.json"));
+        NotificationAlarmService notificationAlarmService = converter
+                .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationAlarmService.QNAME),
+                        json, JSONCodecFactorySupplier.RFC7951);
+        NotificationAlarmServiceSerializer serializer = new NotificationAlarmServiceSerializer();
+        Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
+        serializer.configure(configs, false);
+        byte[] data = serializer.serialize("test", notificationAlarmService);
+        serializer.close();
+        assertNotNull("Serialized data should not be null", data);
+        String expectedJson = Files.readString(Paths.get("src/test/resources/expected_event_alarm_service.json"));
+        // Minify the json string
+        expectedJson = new ObjectMapper().readValue(expectedJson, JsonNode.class).toString();
+        assertEquals("The event should be equals", expectedJson, new String(data, StandardCharsets.UTF_8));
+    }
+}
index 0921e99c67c38a169739b766902a1e4ff4086407..c2ad2ff94e79f8144151138acbe3b493eeb7b03d 100644 (file)
@@ -58,6 +58,20 @@ public final class NotificationServiceDataUtils {
         return notificationServiceBuilder.build();
     }
 
+    public static org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService
+            buildReceivedAlarmEvent() {
+        org.opendaylight.yang.gen.v1
+                .nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmServiceBuilder
+                notificationAlarmServiceBuilder = new org.opendaylight.yang.gen.v1
+                .nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmServiceBuilder()
+                .setMessage("message")
+                .setServiceName("service1")
+                .setOperationalState(State.InService)
+                .setConnectionType(ConnectionType.Service);
+        return notificationAlarmServiceBuilder.build();
+    }
+
     public static ServiceAEndBuilder getServiceAEndBuild() {
         return new ServiceAEndBuilder()
                 .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
diff --git a/nbinotifications/src/test/resources/event_alarm_service.json b/nbinotifications/src/test/resources/event_alarm_service.json
new file mode 100755 (executable)
index 0000000..7dfe5da
--- /dev/null
@@ -0,0 +1,8 @@
+{
+    "notification-alarm-service": {
+        "service-name": "service1",
+        "operational-state": "inService",
+        "message": "The service is now inService",
+        "connection-type": "service"
+    }
+}
diff --git a/nbinotifications/src/test/resources/expected_event_alarm_service.json b/nbinotifications/src/test/resources/expected_event_alarm_service.json
new file mode 100755 (executable)
index 0000000..403e6dc
--- /dev/null
@@ -0,0 +1,8 @@
+{
+  "notification-alarm-service": {
+    "operational-state": "inService",
+    "service-name": "service1",
+    "connection-type": "service",
+    "message": "The service is now inService"
+  }
+}
\ No newline at end of file
diff --git a/servicehandler/src/test/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListenerTest.java b/servicehandler/src/test/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListenerTest.java
new file mode 100755 (executable)
index 0000000..f02dec1
--- /dev/null
@@ -0,0 +1,196 @@
+/*
+ * Copyright © 2021 Orange.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.transportpce.servicehandler.listeners;
+
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.HashSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.node.types.rev181130.NodeIdType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.RxDirection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.lgx.LgxBuilder;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.PortBuilder;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.equipment.states.types.rev181130.AdminStates;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.ServicesBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmServiceBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class ServiceListenerTest {
+
+    @Mock
+    private DataBroker dataBroker;
+    @Mock
+    private NotificationPublishService notificationPublishService;
+
+    @Test
+    public void testOnDataTreeChangedWhenDeleteService() {
+        @SuppressWarnings("unchecked") final DataObjectModification<Services> service =
+                mock(DataObjectModification.class);
+        final Collection<DataTreeModification<Services>> changes = new HashSet<>();
+        @SuppressWarnings("unchecked") final DataTreeModification<Services> ch = mock(DataTreeModification.class);
+        changes.add(ch);
+        when(ch.getRootNode()).thenReturn(service);
+
+        when(service.getModificationType()).thenReturn(DataObjectModification.ModificationType.DELETE);
+        when(service.getDataBefore()).thenReturn(buildService(State.InService, AdminStates.InService));
+        ServiceListener listener = new ServiceListener(dataBroker, notificationPublishService);
+        listener.onDataTreeChanged(changes);
+        verify(ch, times(1)).getRootNode();
+        verify(service, times(1)).getModificationType();
+        verify(service, times(2)).getDataBefore();
+        verify(service, never()).getDataAfter();
+        try {
+            verify(notificationPublishService, never()).putNotification(any(PublishNotificationAlarmService.class));
+        } catch (InterruptedException e) {
+            fail("Failed publishing notification");
+        }
+    }
+
+    @Test
+    public void testOnDataTreeChangedWhenAddService() {
+        @SuppressWarnings("unchecked") final DataObjectModification<Services> service =
+                mock(DataObjectModification.class);
+        final Collection<DataTreeModification<Services>> changes = new HashSet<>();
+        @SuppressWarnings("unchecked") final DataTreeModification<Services> ch = mock(DataTreeModification.class);
+        changes.add(ch);
+        when(ch.getRootNode()).thenReturn(service);
+
+        Services serviceDown = buildService(State.OutOfService, AdminStates.OutOfService);
+        when(service.getModificationType()).thenReturn(DataObjectModification.ModificationType.WRITE);
+        when(service.getDataBefore()).thenReturn(buildService(State.InService, AdminStates.InService));
+        when(service.getDataAfter()).thenReturn(serviceDown);
+        ServiceListener listener = new ServiceListener(dataBroker, notificationPublishService);
+        listener.onDataTreeChanged(changes);
+        verify(ch, times(1)).getRootNode();
+        verify(service, times(1)).getModificationType();
+        verify(service, times(3)).getDataBefore();
+        verify(service, times(2)).getDataAfter();
+        PublishNotificationAlarmService publishNotificationAlarmService =
+                buildNotificationAlarmService(serviceDown, "The service is now outOfService");
+        try {
+            verify(notificationPublishService, times(1))
+                    .putNotification(publishNotificationAlarmService);
+        } catch (InterruptedException e) {
+            fail("Failed publishing notification");
+        }
+    }
+
+    @Test
+    public void testOnDataTreeChangedWhenShouldNeverHappen() {
+        @SuppressWarnings("unchecked") final DataObjectModification<Services> service =
+                mock(DataObjectModification.class);
+        final Collection<DataTreeModification<Services>> changes = new HashSet<>();
+        @SuppressWarnings("unchecked") final DataTreeModification<Services> ch = mock(DataTreeModification.class);
+        changes.add(ch);
+        when(ch.getRootNode()).thenReturn(service);
+
+        when(service.getModificationType()).thenReturn(DataObjectModification.ModificationType.SUBTREE_MODIFIED);
+        when(service.getDataBefore()).thenReturn(buildService(State.InService, AdminStates.InService));
+        ServiceListener listener = new ServiceListener(dataBroker, notificationPublishService);
+        listener.onDataTreeChanged(changes);
+        verify(ch, times(1)).getRootNode();
+        verify(service, times(2)).getModificationType();
+        verify(service, times(2)).getDataBefore();
+        verify(service, never()).getDataAfter();
+        try {
+            verify(notificationPublishService, never()).putNotification(any(PublishNotificationAlarmService.class));
+        } catch (InterruptedException e) {
+            fail("Failed publishing notification");
+        }
+    }
+
+    private Services buildService(State state, AdminStates adminStates) {
+        org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.ServiceAEnd
+                serviceAEnd = getServiceAEndBuild()
+                .build();
+        org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+                .ServiceZEnd serviceZEnd = new org.opendaylight.yang.gen.v1
+                .http.org.openroadm.common.service.types.rev190531.service.ServiceZEndBuilder()
+                .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
+                .setNodeId(new NodeIdType("XPONDER-3-2"))
+                .setTxDirection(getTxDirection())
+                .setRxDirection(getRxDirection())
+                .build();
+
+        ServicesBuilder builtInput = new ServicesBuilder()
+                .setCommonId("commonId")
+                .setConnectionType(ConnectionType.Service)
+                .setCustomer("Customer")
+                .setServiceName("service 1")
+                .setServiceAEnd(serviceAEnd)
+                .setServiceZEnd(serviceZEnd)
+                .setOperationalState(state)
+                .setAdministrativeState(adminStates);
+
+        return builtInput.build();
+    }
+
+    private org.opendaylight.yang.gen.v1
+            .http.org.openroadm.common.service.types.rev190531.service.ServiceAEndBuilder getServiceAEndBuild() {
+        return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+                .ServiceAEndBuilder()
+                .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
+                .setNodeId(new NodeIdType("XPONDER-1-2"))
+                .setTxDirection(getTxDirection())
+                .setRxDirection(getRxDirection());
+    }
+
+    private TxDirection getTxDirection() {
+        return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+                .endpoint.TxDirectionBuilder().setPort(new PortBuilder().setPortDeviceName("device name")
+                .setPortName("port name").setPortRack("port rack").setPortShelf("port shelf")
+                .setPortSlot("port slot").setPortSubSlot("port subslot").setPortType("port type").build())
+                .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name").setLgxPortName("lgx port name")
+                        .setLgxPortRack("lgx port rack").setLgxPortShelf("lgx port shelf").build())
+                .build();
+    }
+
+    private RxDirection getRxDirection() {
+        return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+                .endpoint.RxDirectionBuilder()
+                .setPort(new PortBuilder().setPortDeviceName("device name").setPortName("port name")
+                        .setPortRack("port rack").setPortShelf("port shelf").setPortSlot("port slot")
+                        .setPortSubSlot("port subslot").setPortType("port type").build())
+                .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name")
+                        .setLgxPortName("lgx port name").setLgxPortRack("lgx port rack")
+                        .setLgxPortShelf("lgx port shelf").build())
+                .build();
+    }
+
+    private PublishNotificationAlarmService buildNotificationAlarmService(Services services, String message) {
+        return new PublishNotificationAlarmServiceBuilder()
+                .setServiceName("service 1")
+                .setConnectionType(ConnectionType.Service)
+                .setMessage(message)
+                .setOperationalState(services.getOperationalState())
+                .setTopic("ServiceListener")
+                .build();
+    }
+}
index 00bcae7eb135930ae57542970aa3ae431a5b31e4..27ff94b5947f6095c375b830052a7ec9694c4f37 100644 (file)
@@ -12,6 +12,7 @@
 # pylint: disable=too-many-public-methods
 
 import os
+import json
 import sys
 import unittest
 import time
@@ -307,7 +308,74 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         self.assertEqual(res['output']['notification-service'][-1]['message'], 'Service implemented !')
         time.sleep(2)
 
-    def test_17_delete_eth_service1(self):
+    def test_17_get_notifications_alarm_service1(self):
+        data = {
+            "input": {
+                "connection-type": "service",
+                "id-consumer": "consumer",
+                "group-id": "transportpceTest"
+            }
+        }
+        response = test_utils.get_notifications_alarm_service_request(data)
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['service-name'], 'service1')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['connection-type'], 'service')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['operational-state'], 'inService')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['message'], 'The service is now inService')
+        time.sleep(2)
+
+    def test_18_change_status_port_roadma_srg(self):
+        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
+        body = {"ports": [{
+            "port-name": "C1",
+            "logical-connection-point": "SRG1-PP1",
+            "port-type": "client",
+            "circuit-id": "SRG1",
+            "administrative-state": "outOfService",
+            "port-qual": "roadm-external"}]}
+        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
+                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        time.sleep(2)
+
+    def test_19_get_notifications_alarm_service1(self):
+        data = {
+            "input": {
+                "connection-type": "service",
+                "id-consumer": "consumer",
+                "group-id": "transportpceTest"
+            }
+        }
+        response = test_utils.get_notifications_alarm_service_request(data)
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['service-name'], 'service1')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['connection-type'], 'service')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['operational-state'], 'outOfService')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['message'], 'The service is now outOfService')
+        time.sleep(2)
+
+    def test_20_restore_status_port_roadma_srg(self):
+        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
+        body = {"ports": [{
+            "port-name": "C1",
+            "logical-connection-point": "SRG1-PP1",
+            "port-type": "client",
+            "circuit-id": "SRG1",
+            "administrative-state": "inService",
+            "port-qual": "roadm-external"}]}
+        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
+                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        time.sleep(2)
+
+    def test_21_get_notifications_alarm_service1(self):
+        self.test_17_get_notifications_alarm_service1()
+
+    def test_22_delete_eth_service1(self):
         response = test_utils.service_delete_request("service1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
@@ -315,7 +383,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
                       res['output']['configuration-response-common']['response-message'])
         time.sleep(20)
 
-    def test_18_get_notifications_service1(self):
+    def test_23_get_notifications_service1(self):
         data = {
             "input": {
                 "connection-type": "service",
@@ -331,19 +399,19 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         self.assertEqual(res['output']['notification-service'][-1]['message'], 'Service deleted !')
         time.sleep(2)
 
-    def test_19_disconnect_XPDRA(self):
+    def test_24_disconnect_XPDRA(self):
         response = test_utils.unmount_device("XPDR-A1")
         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
-    def test_20_disconnect_XPDRC(self):
+    def test_25_disconnect_XPDRC(self):
         response = test_utils.unmount_device("XPDR-C1")
         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
-    def test_21_disconnect_ROADMA(self):
+    def test_26_disconnect_ROADMA(self):
         response = test_utils.unmount_device("ROADM-A1")
         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
-    def test_22_disconnect_ROADMC(self):
+    def test_27_disconnect_ROADMC(self):
         response = test_utils.unmount_device("ROADM-C1")
         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
index d709975a31a2bb69880e777ee9c948488014c94e..ffaaab8ff6cd7315dd6527417b9bb919434ceccc 100644 (file)
@@ -44,6 +44,7 @@ URL_CONFIG_ORDM_NET = "{}/config/ietf-network:networks/network/openroadm-network
 URL_PORTMAPPING = "{}/config/transportpce-portmapping:network/nodes/"
 URL_OPER_SERV_LIST = "{}/operational/org-openroadm-service:service-list/"
 URL_GET_NBINOTIFICATIONS_SERV = "{}/operations/nbi-notifications:get-notifications-service/"
+URL_GET_NBINOTIFICATIONS_ALARM_SERV = "{}/operations/nbi-notifications:get-notifications-alarm-service/"
 URL_SERV_CREATE = "{}/operations/org-openroadm-service:service-create"
 URL_SERV_DELETE = "{}/operations/org-openroadm-service:service-delete"
 URL_SERVICE_PATH = "{}/operations/transportpce-device-renderer:service-path"
@@ -345,6 +346,10 @@ def get_notifications_service_request(attr):
     return post_request(URL_GET_NBINOTIFICATIONS_SERV, attr)
 
 
+def get_notifications_alarm_service_request(attr):
+    return post_request(URL_GET_NBINOTIFICATIONS_ALARM_SERV, attr)
+
+
 def get_service_list_request(suffix: str):
     url = URL_OPER_SERV_LIST + suffix
     return get_request(url)