import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Subscriber {
+public class Subscriber<T extends DataObject, D> {
private static final Logger LOG = LoggerFactory.getLogger(Subscriber.class);
- private final Consumer<String, NotificationService> consumer;
+ private final Consumer<String, D> consumer;
- public Subscriber(String id, String groupId, String suscriberServer,
- JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev210628.NotificationService> deserializer) {
+ public Subscriber(String id, String groupId, String subscriberServer, JsonStringConverter<T> deserializer,
+ Class<?> deserializerConf) {
Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationServiceDeserializer.class);
+ propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf);
propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
- if (suscriberServer != null && !suscriberServer.isBlank()) {
- propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, suscriberServer);
+ if (subscriberServer != null && !subscriberServer.isBlank()) {
+ propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer);
}
- LOG.info("Suscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
+ LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
consumer = new KafkaConsumer<>(propsConsumer);
}
- public List<NotificationService> subscribeService(String topicName) {
+ public List<D> subscribe(String topicName, @NonNull QName name) {
LOG.info("Subscribe request to topic '{}' ", topicName);
consumer.subscribe(Collections.singleton(topicName));
- final ConsumerRecords<String, NotificationService> consumerRecords = consumer.poll(Duration.ofMillis(1000));
- List<NotificationService> notificationServiceList = new ArrayList<>();
- YangInstanceIdentifier.of(NotificationService.QNAME);
- for (ConsumerRecord<String, NotificationService> record : consumerRecords) {
+ final ConsumerRecords<String, D> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+ List<D> notificationServiceList = new ArrayList<>();
+ YangInstanceIdentifier.of(name);
+ for (ConsumerRecord<String, D> record : consumerRecords) {
if (record.value() != null) {
notificationServiceList.add(record.value());
}
return notificationServiceList;
}
- @VisibleForTesting public Subscriber(Consumer<String, NotificationService> consumer) {
+ @VisibleForTesting public Subscriber(Consumer<String, D> consumer) {
this.consumer = consumer;
}
}
+++ /dev/null
-/*
- * Copyright © 2021 Orange, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.transportpce.nbinotifications.consumer;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.opendaylight.transportpce.common.converter.JsonStringConverter;
-import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
-import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SubscriberAlarm {
- private static final Logger LOG = LoggerFactory.getLogger(SubscriberAlarm.class);
-
- private final Consumer<String, NotificationAlarmService> consumer;
-
- public SubscriberAlarm(String id, String groupId, String subscriberServer,
- JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev210628.NotificationAlarmService> deserializer) {
- Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
- propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
- propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationAlarmServiceDeserializer.class);
- propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
- if (subscriberServer != null && !subscriberServer.isBlank()) {
- propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer);
- }
- LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
- consumer = new KafkaConsumer<>(propsConsumer);
- }
-
- public List<NotificationAlarmService> subscribeAlarm(String topicName) {
- LOG.info("Subscribe request to topic '{}' ", topicName);
- consumer.subscribe(Collections.singleton(topicName));
- final ConsumerRecords<String, NotificationAlarmService> consumerRecords = consumer
- .poll(Duration.ofMillis(1000));
- List<NotificationAlarmService> notificationAlarmServiceList = new ArrayList<>();
- YangInstanceIdentifier.of(NotificationAlarmService.QNAME);
- for (ConsumerRecord<String, NotificationAlarmService> record : consumerRecords) {
- if (record.value() != null) {
- notificationAlarmServiceList.add(record.value());
- }
- }
- LOG.info("Getting records '{}' ", notificationAlarmServiceList);
- consumer.unsubscribe();
- consumer.close();
- return notificationAlarmServiceList;
- }
-
- @VisibleForTesting public SubscriberAlarm(Consumer<String, NotificationAlarmService> consumer) {
- this.consumer = consumer;
- }
-}
import java.util.List;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
-import org.opendaylight.transportpce.nbinotifications.consumer.SubscriberAlarm;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceInput;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutput;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutputBuilder;
LOG.warn("Missing mandatory params for input {}", input);
return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture();
}
- Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converterService);
+ Subscriber<org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService,
+ NotificationService> subscriber = new Subscriber<>(input.getIdConsumer(), input.getGroupId(), server,
+ converterService, NotificationServiceDeserializer.class);
List<NotificationService> notificationServiceList = subscriber
- .subscribeService(input.getConnectionType().getName());
+ .subscribe(input.getConnectionType().getName(), NotificationService.QNAME);
GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder()
.setNotificationService(notificationServiceList);
return RpcResultBuilder.success(output.build()).buildFuture();
LOG.warn("Missing mandatory params for input {}", input);
return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture();
}
- SubscriberAlarm subscriberAlarm = new SubscriberAlarm(input.getIdConsumer(), input.getGroupId(), server,
- converterAlarmService);
- List<NotificationAlarmService> notificationAlarmServiceList = subscriberAlarm
- .subscribeAlarm("alarm" + input.getConnectionType().getName());
+ Subscriber<org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService,
+ NotificationAlarmService> subscriber = new Subscriber<>(input.getIdConsumer(), input.getGroupId(),
+ server, converterAlarmService, NotificationAlarmServiceDeserializer.class);
+ List<NotificationAlarmService> notificationAlarmServiceList = subscriber
+ .subscribe("alarm" + input.getConnectionType().getName(), NotificationAlarmService.QNAME);
GetNotificationsAlarmServiceOutputBuilder output = new GetNotificationsAlarmServiceOutputBuilder()
.setNotificationAlarmService(notificationAlarmServiceList);
return RpcResultBuilder.success(output.build()).buildFuture();
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.slf4j.Logger;
public class NbiNotificationsProvider {
private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class);
- private static Map<String, Publisher> publishersServiceMap = new HashMap<>();
- private static Map<String, PublisherAlarm> publishersAlarmMap = new HashMap<>();
+ private static Map<String, Publisher<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationService>> publishersServiceMap = new HashMap<>();
+ private static Map<String, Publisher<NotificationAlarmService>> publishersAlarmMap = new HashMap<>();
private final RpcProviderService rpcService;
private ObjectRegistration<NbiNotificationsService> rpcRegistration;
private NotificationService notificationService;
private final JsonStringConverter<org.opendaylight.yang.gen.v1
.nbi.notifications.rev210628.NotificationService> converterService;
- private final JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService;
+ private final JsonStringConverter<NotificationAlarmService> converterAlarmService;
private final String subscriberServer;
converterService = new JsonStringConverter<>(bindingDOMCodecServices);
for (String topic: topicsService) {
LOG.info("Creating publisher for topic {}", topic);
- publishersServiceMap.put(topic, new Publisher(topic, publisherServer, converterService));
+ publishersServiceMap.put(topic, new Publisher<>(topic, publisherServer, converterService,
+ NotificationServiceSerializer.class));
}
converterAlarmService = new JsonStringConverter<>(bindingDOMCodecServices);
for (String topic: topicsAlarm) {
LOG.info("Creating publisher for topic {}", topic);
- publishersAlarmMap.put(topic, new PublisherAlarm(topic, publisherServer, converterAlarmService));
+ publishersAlarmMap.put(topic, new Publisher<>(topic, publisherServer, converterAlarmService,
+ NotificationAlarmServiceSerializer.class));
}
this.subscriberServer = subscriberServer;
}
* Method called when the blueprint container is destroyed.
*/
public void close() {
- for (Publisher publisher : publishersServiceMap.values()) {
+ for (Publisher<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationService> publisher : publishersServiceMap.values()) {
publisher.close();
}
- for (PublisherAlarm publisherAlarm : publishersAlarmMap.values()) {
+ for (Publisher<NotificationAlarmService> publisherAlarm : publishersAlarmMap.values()) {
publisherAlarm.close();
}
rpcRegistration.close();
import java.util.Map;
import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationServiceBuilder;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsListenerImpl.class);
- private Map<String, Publisher> publishersServiceMap;
- private Map<String, PublisherAlarm> publishersAlarmMap;
+ private final Map<String, Publisher<NotificationService>> publishersServiceMap;
+ private final Map<String, Publisher<NotificationAlarmService>> publishersAlarmMap;
- public NbiNotificationsListenerImpl(Map<String, Publisher> publishersServiceMap,
- Map<String, PublisherAlarm> publishersAlarmMap) {
+ public NbiNotificationsListenerImpl(Map<String, Publisher<NotificationService>> publishersServiceMap,
+ Map<String, Publisher<NotificationAlarmService>> publishersAlarmMap) {
this.publishersServiceMap = publishersServiceMap;
this.publishersAlarmMap = publishersAlarmMap;
}
LOG.error("Unknown topic {}", topic);
return;
}
- Publisher publisher = publishersServiceMap.get(topic);
+ Publisher<NotificationService> publisher = publishersServiceMap.get(topic);
publisher.sendEvent(new NotificationServiceBuilder().setCommonId(notification.getCommonId())
.setConnectionType(notification.getConnectionType()).setMessage(notification.getMessage())
.setOperationalState(notification.getOperationalState())
.setResponseFailed(notification.getResponseFailed())
.setServiceAEnd(notification.getServiceAEnd())
.setServiceName(notification.getServiceName())
- .setServiceZEnd(notification.getServiceZEnd()).build());
+ .setServiceZEnd(notification.getServiceZEnd()).build(), notification.getConnectionType().getName());
}
@Override
LOG.error("Unknown topic {}", topic);
return;
}
- PublisherAlarm publisherAlarm = publishersAlarmMap.get(topic);
+ Publisher<NotificationAlarmService> publisherAlarm = publishersAlarmMap.get(topic);
publisherAlarm.sendEvent(new NotificationAlarmServiceBuilder().setConnectionType(notification
.getConnectionType())
.setMessage(notification.getMessage())
.setOperationalState(notification.getOperationalState())
.setServiceName(notification.getServiceName())
- .build());
+ .build(), "alarm" + notification.getConnectionType().getName());
}
}
import org.apache.kafka.common.serialization.StringSerializer;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Publisher {
+public class Publisher<T extends DataObject> {
private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
private final String id;
- private final Producer<String, NotificationService> producer;
+ private final Producer<String, T> producer;
- public Publisher(String id, String publisherServer, JsonStringConverter<NotificationService> serializer) {
+ public Publisher(String id, String publisherServer, JsonStringConverter<T> serializer, Class<?> serializerConf) {
Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
if (publisherServer != null && !publisherServer.isBlank()) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
}
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationServiceSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , serializerConf);
properties.put(ConfigConstants.CONVERTER , serializer);
- LOG.info("Creationg publisher for id {} with properties {}", id, properties);
+ LOG.info("Creation publisher for id {} with properties {}", id, properties);
producer = new KafkaProducer<>(properties);
this.id = id;
}
- @VisibleForTesting Publisher(String id, Producer<String, NotificationService> producer) {
+ @VisibleForTesting Publisher(String id, Producer<String, T> producer) {
this.producer = producer;
this.id = id;
}
producer.close();
}
- public void sendEvent(NotificationService notificationService) {
- LOG.info("SendEvent request to topic '{}' ", notificationService.getConnectionType().getName());
- producer.send(new ProducerRecord<>(notificationService.getConnectionType().getName(), id, notificationService));
+ public void sendEvent(T notification, String topic) {
+ LOG.info("SendEvent request to topic '{}' ", topic);
+ producer.send(new ProducerRecord<>(topic, id, notification));
producer.flush();
}
}
+++ /dev/null
-/*
- * Copyright © 2021 Orange, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.transportpce.nbinotifications.producer;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Properties;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.opendaylight.transportpce.common.converter.JsonStringConverter;
-import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer;
-import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PublisherAlarm {
- private static final Logger LOG = LoggerFactory.getLogger(PublisherAlarm.class);
-
- private final String id;
- private final Producer<String, NotificationAlarmService> producer;
-
- public PublisherAlarm(String id, String publisherServer, JsonStringConverter<NotificationAlarmService> serializer) {
- Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
- if (publisherServer != null && !publisherServer.isBlank()) {
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
- }
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationAlarmServiceSerializer.class);
- properties.put(ConfigConstants.CONVERTER , serializer);
- LOG.info("Creationg publisher for id {} with properties {}", id, properties);
- producer = new KafkaProducer<>(properties);
- this.id = id;
- }
-
- @VisibleForTesting
- PublisherAlarm(String id, Producer<String, NotificationAlarmService> producer) {
- this.producer = producer;
- this.id = id;
- }
-
- public void close() {
- producer.close();
- }
-
- public void sendEvent(NotificationAlarmService notificationAlarmService) {
- LOG.info("SendEvent request to topic '{}' ", notificationAlarmService.getConnectionType().getName());
- producer.send(new ProducerRecord<>("alarm" + notificationAlarmService.getConnectionType().getName(),
- id, notificationAlarmService));
- producer.flush();
- }
-}
import org.junit.Test;
import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils;
import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
public class SubscriberTest extends AbstractTest {
private static final String TOPIC = "topic";
private static final int PARTITION = 0;
private MockConsumer<String, NotificationService> mockConsumer;
- private Subscriber subscriber;
+ private MockConsumer<String, NotificationAlarmService> mockConsumerAlarm;
+ private Subscriber<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationService, NotificationService> subscriberService;
+ private Subscriber<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService, NotificationAlarmService> subscriberAlarmService;
@Before
public void setUp() {
mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- subscriber = new Subscriber(mockConsumer);
+ mockConsumerAlarm = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ subscriberService = new Subscriber<>(mockConsumer);
+ subscriberAlarmService = new Subscriber<>(mockConsumerAlarm);
}
@Test
public void subscribeServiceShouldBeSuccessful() {
// from https://www.baeldung.com/kafka-mockconsumer
- ConsumerRecord<String, NotificationService> record = new ConsumerRecord<String, NotificationService>(
+ ConsumerRecord<String, NotificationService> record = new ConsumerRecord<>(
TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedEvent());
mockConsumer.schedulePollTask(() -> {
mockConsumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
startOffsets.put(tp, 0L);
mockConsumer.updateBeginningOffsets(startOffsets);
- List<NotificationService> result = subscriber.subscribeService(TOPIC);
+ List<NotificationService> result = subscriberService.subscribe(TOPIC, NotificationService.QNAME);
assertEquals("There should be 1 record", 1, result.size());
assertTrue("Consumer should be closed", mockConsumer.closed());
}
+
+ @Test
+ public void subscribeAlarmShouldBeSuccessful() {
+ // from https://www.baeldung.com/kafka-mockconsumer
+ ConsumerRecord<String, NotificationAlarmService> record = new ConsumerRecord<>(
+ TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedAlarmEvent());
+ mockConsumerAlarm.schedulePollTask(() -> {
+ mockConsumerAlarm.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
+ mockConsumerAlarm.addRecord(record);
+ });
+
+ Map<TopicPartition, Long> startOffsets = new HashMap<>();
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ startOffsets.put(tp, 0L);
+ mockConsumerAlarm.updateBeginningOffsets(startOffsets);
+ List<NotificationAlarmService> result = subscriberAlarmService.subscribe(TOPIC, NotificationAlarmService.QNAME);
+ assertEquals("There should be 1 record", 1, result.size());
+ assertTrue("Consumer should be closed", mockConsumerAlarm.closed());
+ }
}
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.test.AbstractTest;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceInputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutput;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceInputBuilder;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutput;
import org.opendaylight.yangtools.yang.common.RpcResult;
nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm,"localhost:8080");
}
+ @Test
public void getNotificationsServiceEmptyDataTest() throws InterruptedException, ExecutionException {
ListenableFuture<RpcResult<GetNotificationsServiceOutput>> result =
nbiNotificationsImpl.getNotificationsService(new GetNotificationsServiceInputBuilder().build());
@Test
public void getNotificationsServiceTest() throws InterruptedException, ExecutionException {
- GetNotificationsServiceInputBuilder builder = new GetNotificationsServiceInputBuilder();
- builder.setGroupId("groupId");
- builder.setIdConsumer("consumerId");
- builder.setConnectionType(ConnectionType.Service);
+ GetNotificationsServiceInputBuilder builder = new GetNotificationsServiceInputBuilder()
+ .setGroupId("groupId")
+ .setIdConsumer("consumerId")
+ .setConnectionType(ConnectionType.Service);
ListenableFuture<RpcResult<GetNotificationsServiceOutput>> result =
nbiNotificationsImpl.getNotificationsService(builder.build());
assertNull("Should be null", result.get().getResult().getNotificationService());
}
+
+ @Test
+ public void getNotificationsAlarmServiceTest() throws InterruptedException, ExecutionException {
+ GetNotificationsAlarmServiceInputBuilder builder = new GetNotificationsAlarmServiceInputBuilder()
+ .setGroupId("groupId")
+ .setIdConsumer("consumerId")
+ .setConnectionType(ConnectionType.Service);
+ ListenableFuture<RpcResult<GetNotificationsAlarmServiceOutput>> result =
+ nbiNotificationsImpl.getNotificationsAlarmService(builder.build());
+ assertNull("Should be null", result.get().getResult().getNotificationAlarmService());
+ }
}
package org.opendaylight.transportpce.nbinotifications.listener;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
import org.opendaylight.transportpce.test.AbstractTest;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmServiceBuilder;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
public class NbiNotificationsListenerImplTest extends AbstractTest {
@Mock
- private Publisher publisher;
+ private Publisher<NotificationService> publisherService;
@Mock
- private PublisherAlarm publisherAlarm;
+ private Publisher<NotificationAlarmService> publisherAlarm;
@Before
public void setUp() {
@Test
public void onPublishNotificationServiceTest() {
- NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher),
+ NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisherService),
Map.of("test", publisherAlarm));
PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("test")
.setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
.setOperationalState(State.OutOfService).setServiceName("service name").build();
listener.onPublishNotificationService(notification);
- verify(publisher, times(1)).sendEvent(any());
+ verify(publisherService, times(1)).sendEvent(any(), anyString());
}
@Test
public void onPublishNotificationServiceWrongTopicTest() {
- NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher),
+ NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisherService),
Map.of("test", publisherAlarm));
PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("wrongtopic")
.setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
.setOperationalState(State.OutOfService).setServiceName("service name").build();
listener.onPublishNotificationService(notification);
- verify(publisher, times(0)).sendEvent(any());
+ verify(publisherService, times(0)).sendEvent(any(), anyString());
+ }
+
+ @Test
+ public void onPublishNotificationAlarmServiceTest() {
+ NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisherService),
+ Map.of("test", publisherAlarm));
+ PublishNotificationAlarmService notification = new PublishNotificationAlarmServiceBuilder().setTopic("test")
+ .setConnectionType(ConnectionType.Service).setMessage("The service is now inService")
+ .setOperationalState(State.OutOfService).setServiceName("service name").build();
+ listener.onPublishNotificationAlarmService(notification);
+ verify(publisherAlarm, times(1)).sendEvent(any(), anyString());
+ }
+
+ @Test
+ public void onPublishNotificationAlarmServiceWrongTopicTest() {
+ NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisherService),
+ Map.of("test", publisherAlarm));
+ PublishNotificationAlarmService notification = new PublishNotificationAlarmServiceBuilder()
+ .setTopic("wrongtopic").setConnectionType(ConnectionType.Service)
+ .setMessage("The service is now inService").setOperationalState(State.OutOfService)
+ .setServiceName("service name").build();
+ listener.onPublishNotificationAlarmService(notification);
+ verify(publisherAlarm, times(0)).sendEvent(any(), anyString());
}
}
import org.junit.Test;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer;
import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
public class PublisherTest extends AbstractTest {
- private JsonStringConverter<NotificationService> converter;
- private Publisher publisher;
+ private JsonStringConverter<NotificationService> converterService;
+ private JsonStringConverter<NotificationAlarmService> converterAlarm;
+ private Publisher<NotificationService> publisherService;
+ private Publisher<NotificationAlarmService> publisherAlarm;
private MockProducer<String, NotificationService> mockProducer;
+ private MockProducer<String, NotificationAlarmService> mockAlarmProducer;
@Before
public void setUp() {
- NotificationServiceSerializer serializer = new NotificationServiceSerializer();
- Map<String, Object> properties = Map.of(ConfigConstants.CONVERTER , serializer);
- serializer.configure(properties, false);
- mockProducer = new MockProducer<>(true, new StringSerializer(), serializer);
- converter = new JsonStringConverter<NotificationService>(
- getDataStoreContextUtil().getBindingDOMCodecServices());
- publisher = new Publisher("test",mockProducer);
+ NotificationServiceSerializer serializerService = new NotificationServiceSerializer();
+ NotificationAlarmServiceSerializer serializerAlarm = new NotificationAlarmServiceSerializer();
+ Map<String, Object> properties = Map.of(ConfigConstants.CONVERTER, serializerService);
+ Map<String, Object> propertiesAlarm = Map.of(ConfigConstants.CONVERTER, serializerAlarm);
+ serializerService.configure(properties, false);
+ serializerAlarm.configure(propertiesAlarm, false);
+ mockProducer = new MockProducer<>(true, new StringSerializer(), serializerService);
+ mockAlarmProducer = new MockProducer<>(true, new StringSerializer(), serializerAlarm);
+ converterService = new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+ converterAlarm = new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+ publisherService = new Publisher<>("test", mockProducer);
+ publisherAlarm = new Publisher<>("test", mockAlarmProducer);
}
@Test
- public void sendEventShouldBeSuccessful() throws IOException {
+ public void sendEventServiceShouldBeSuccessful() throws IOException {
String json = Files.readString(Paths.get("src/test/resources/event.json"));
- NotificationService notificationService = converter
+ NotificationService notificationService = converterService
.createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationService.QNAME),
json, JSONCodecFactorySupplier.RFC7951);
- publisher.sendEvent(notificationService);
+ publisherService.sendEvent(notificationService, notificationService.getConnectionType().name());
assertEquals("We should have one message", 1, mockProducer.history().size());
- assertEquals("Key should be test", "test",mockProducer.history().get(0).key());
+ assertEquals("Key should be test", "test", mockProducer.history().get(0).key());
}
+ @Test
+ public void sendEventAlarmShouldBeSuccessful() throws IOException {
+ String json = Files.readString(Paths.get("src/test/resources/event_alarm_service.json"));
+ NotificationAlarmService notificationAlarmService = converterAlarm
+ .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationAlarmService.QNAME),
+ json, JSONCodecFactorySupplier.RFC7951);
+ publisherAlarm.sendEvent(notificationAlarmService, "alarm"
+ + notificationAlarmService.getConnectionType().getName());
+ assertEquals("We should have one message", 1, mockAlarmProducer.history().size());
+ assertEquals("Key should be test", "test", mockAlarmProducer.history().get(0).key());
+ }
}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
+
+public class NotificationAlarmServiceDeserializerTest extends AbstractTest {
+
+ @Test
+ public void deserializeTest() throws IOException {
+ JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.NotificationAlarmService> converter =
+ new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+ NotificationAlarmServiceDeserializer deserializer = new NotificationAlarmServiceDeserializer();
+ Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
+ deserializer.configure(configs, false);
+ NotificationAlarmService readEvent = deserializer.deserialize("Test",
+ Files.readAllBytes(Paths.get("src/test/resources/event_alarm_service.json")));
+ deserializer.close();
+ assertEquals("Service name should be service1", "service1", readEvent.getServiceName());
+ assertEquals("message should be The service is now inService", "The service is now inService",
+ readEvent.getMessage());
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+
+public class NotificationAlarmServiceSerializerTest extends AbstractTest {
+
+ @Test
+ public void serializeTest() throws IOException {
+ JsonStringConverter<NotificationAlarmService> converter =
+ new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+ String json = Files.readString(Paths.get("src/test/resources/event_alarm_service.json"));
+ NotificationAlarmService notificationAlarmService = converter
+ .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationAlarmService.QNAME),
+ json, JSONCodecFactorySupplier.RFC7951);
+ NotificationAlarmServiceSerializer serializer = new NotificationAlarmServiceSerializer();
+ Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
+ serializer.configure(configs, false);
+ byte[] data = serializer.serialize("test", notificationAlarmService);
+ serializer.close();
+ assertNotNull("Serialized data should not be null", data);
+ String expectedJson = Files.readString(Paths.get("src/test/resources/expected_event_alarm_service.json"));
+ // Minify the json string
+ expectedJson = new ObjectMapper().readValue(expectedJson, JsonNode.class).toString();
+ assertEquals("The event should be equals", expectedJson, new String(data, StandardCharsets.UTF_8));
+ }
+}
return notificationServiceBuilder.build();
}
+ public static org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService
+ buildReceivedAlarmEvent() {
+ org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmServiceBuilder
+ notificationAlarmServiceBuilder = new org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmServiceBuilder()
+ .setMessage("message")
+ .setServiceName("service1")
+ .setOperationalState(State.InService)
+ .setConnectionType(ConnectionType.Service);
+ return notificationAlarmServiceBuilder.build();
+ }
+
public static ServiceAEndBuilder getServiceAEndBuild() {
return new ServiceAEndBuilder()
.setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
--- /dev/null
+{
+ "notification-alarm-service": {
+ "service-name": "service1",
+ "operational-state": "inService",
+ "message": "The service is now inService",
+ "connection-type": "service"
+ }
+}
--- /dev/null
+{
+ "notification-alarm-service": {
+ "operational-state": "inService",
+ "service-name": "service1",
+ "connection-type": "service",
+ "message": "The service is now inService"
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright © 2021 Orange. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.transportpce.servicehandler.listeners;
+
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.HashSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.node.types.rev181130.NodeIdType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.RxDirection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.lgx.LgxBuilder;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.PortBuilder;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.equipment.states.types.rev181130.AdminStates;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.ServicesBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmServiceBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class ServiceListenerTest {
+
+ @Mock
+ private DataBroker dataBroker;
+ @Mock
+ private NotificationPublishService notificationPublishService;
+
+ @Test
+ public void testOnDataTreeChangedWhenDeleteService() {
+ @SuppressWarnings("unchecked") final DataObjectModification<Services> service =
+ mock(DataObjectModification.class);
+ final Collection<DataTreeModification<Services>> changes = new HashSet<>();
+ @SuppressWarnings("unchecked") final DataTreeModification<Services> ch = mock(DataTreeModification.class);
+ changes.add(ch);
+ when(ch.getRootNode()).thenReturn(service);
+
+ when(service.getModificationType()).thenReturn(DataObjectModification.ModificationType.DELETE);
+ when(service.getDataBefore()).thenReturn(buildService(State.InService, AdminStates.InService));
+ ServiceListener listener = new ServiceListener(dataBroker, notificationPublishService);
+ listener.onDataTreeChanged(changes);
+ verify(ch, times(1)).getRootNode();
+ verify(service, times(1)).getModificationType();
+ verify(service, times(2)).getDataBefore();
+ verify(service, never()).getDataAfter();
+ try {
+ verify(notificationPublishService, never()).putNotification(any(PublishNotificationAlarmService.class));
+ } catch (InterruptedException e) {
+ fail("Failed publishing notification");
+ }
+ }
+
+ @Test
+ public void testOnDataTreeChangedWhenAddService() {
+ @SuppressWarnings("unchecked") final DataObjectModification<Services> service =
+ mock(DataObjectModification.class);
+ final Collection<DataTreeModification<Services>> changes = new HashSet<>();
+ @SuppressWarnings("unchecked") final DataTreeModification<Services> ch = mock(DataTreeModification.class);
+ changes.add(ch);
+ when(ch.getRootNode()).thenReturn(service);
+
+ Services serviceDown = buildService(State.OutOfService, AdminStates.OutOfService);
+ when(service.getModificationType()).thenReturn(DataObjectModification.ModificationType.WRITE);
+ when(service.getDataBefore()).thenReturn(buildService(State.InService, AdminStates.InService));
+ when(service.getDataAfter()).thenReturn(serviceDown);
+ ServiceListener listener = new ServiceListener(dataBroker, notificationPublishService);
+ listener.onDataTreeChanged(changes);
+ verify(ch, times(1)).getRootNode();
+ verify(service, times(1)).getModificationType();
+ verify(service, times(3)).getDataBefore();
+ verify(service, times(2)).getDataAfter();
+ PublishNotificationAlarmService publishNotificationAlarmService =
+ buildNotificationAlarmService(serviceDown, "The service is now outOfService");
+ try {
+ verify(notificationPublishService, times(1))
+ .putNotification(publishNotificationAlarmService);
+ } catch (InterruptedException e) {
+ fail("Failed publishing notification");
+ }
+ }
+
+ @Test
+ public void testOnDataTreeChangedWhenShouldNeverHappen() {
+ @SuppressWarnings("unchecked") final DataObjectModification<Services> service =
+ mock(DataObjectModification.class);
+ final Collection<DataTreeModification<Services>> changes = new HashSet<>();
+ @SuppressWarnings("unchecked") final DataTreeModification<Services> ch = mock(DataTreeModification.class);
+ changes.add(ch);
+ when(ch.getRootNode()).thenReturn(service);
+
+ when(service.getModificationType()).thenReturn(DataObjectModification.ModificationType.SUBTREE_MODIFIED);
+ when(service.getDataBefore()).thenReturn(buildService(State.InService, AdminStates.InService));
+ ServiceListener listener = new ServiceListener(dataBroker, notificationPublishService);
+ listener.onDataTreeChanged(changes);
+ verify(ch, times(1)).getRootNode();
+ verify(service, times(2)).getModificationType();
+ verify(service, times(2)).getDataBefore();
+ verify(service, never()).getDataAfter();
+ try {
+ verify(notificationPublishService, never()).putNotification(any(PublishNotificationAlarmService.class));
+ } catch (InterruptedException e) {
+ fail("Failed publishing notification");
+ }
+ }
+
+ private Services buildService(State state, AdminStates adminStates) {
+ org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.ServiceAEnd
+ serviceAEnd = getServiceAEndBuild()
+ .build();
+ org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+ .ServiceZEnd serviceZEnd = new org.opendaylight.yang.gen.v1
+ .http.org.openroadm.common.service.types.rev190531.service.ServiceZEndBuilder()
+ .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
+ .setNodeId(new NodeIdType("XPONDER-3-2"))
+ .setTxDirection(getTxDirection())
+ .setRxDirection(getRxDirection())
+ .build();
+
+ ServicesBuilder builtInput = new ServicesBuilder()
+ .setCommonId("commonId")
+ .setConnectionType(ConnectionType.Service)
+ .setCustomer("Customer")
+ .setServiceName("service 1")
+ .setServiceAEnd(serviceAEnd)
+ .setServiceZEnd(serviceZEnd)
+ .setOperationalState(state)
+ .setAdministrativeState(adminStates);
+
+ return builtInput.build();
+ }
+
+ private org.opendaylight.yang.gen.v1
+ .http.org.openroadm.common.service.types.rev190531.service.ServiceAEndBuilder getServiceAEndBuild() {
+ return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+ .ServiceAEndBuilder()
+ .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
+ .setNodeId(new NodeIdType("XPONDER-1-2"))
+ .setTxDirection(getTxDirection())
+ .setRxDirection(getRxDirection());
+ }
+
+ private TxDirection getTxDirection() {
+ return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+ .endpoint.TxDirectionBuilder().setPort(new PortBuilder().setPortDeviceName("device name")
+ .setPortName("port name").setPortRack("port rack").setPortShelf("port shelf")
+ .setPortSlot("port slot").setPortSubSlot("port subslot").setPortType("port type").build())
+ .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name").setLgxPortName("lgx port name")
+ .setLgxPortRack("lgx port rack").setLgxPortShelf("lgx port shelf").build())
+ .build();
+ }
+
+ private RxDirection getRxDirection() {
+ return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+ .endpoint.RxDirectionBuilder()
+ .setPort(new PortBuilder().setPortDeviceName("device name").setPortName("port name")
+ .setPortRack("port rack").setPortShelf("port shelf").setPortSlot("port slot")
+ .setPortSubSlot("port subslot").setPortType("port type").build())
+ .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name")
+ .setLgxPortName("lgx port name").setLgxPortRack("lgx port rack")
+ .setLgxPortShelf("lgx port shelf").build())
+ .build();
+ }
+
+ private PublishNotificationAlarmService buildNotificationAlarmService(Services services, String message) {
+ return new PublishNotificationAlarmServiceBuilder()
+ .setServiceName("service 1")
+ .setConnectionType(ConnectionType.Service)
+ .setMessage(message)
+ .setOperationalState(services.getOperationalState())
+ .setTopic("ServiceListener")
+ .build();
+ }
+}
# pylint: disable=too-many-public-methods
import os
+import json
import sys
import unittest
import time
self.assertEqual(res['output']['notification-service'][-1]['message'], 'Service implemented !')
time.sleep(2)
- def test_17_delete_eth_service1(self):
+ def test_17_get_notifications_alarm_service1(self):
+ data = {
+ "input": {
+ "connection-type": "service",
+ "id-consumer": "consumer",
+ "group-id": "transportpceTest"
+ }
+ }
+ response = test_utils.get_notifications_alarm_service_request(data)
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertEqual(res['output']['notification-alarm-service'][-1]['service-name'], 'service1')
+ self.assertEqual(res['output']['notification-alarm-service'][-1]['connection-type'], 'service')
+ self.assertEqual(res['output']['notification-alarm-service'][-1]['operational-state'], 'inService')
+ self.assertEqual(res['output']['notification-alarm-service'][-1]['message'], 'The service is now inService')
+ time.sleep(2)
+
+ def test_18_change_status_port_roadma_srg(self):
+ url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
+ body = {"ports": [{
+ "port-name": "C1",
+ "logical-connection-point": "SRG1-PP1",
+ "port-type": "client",
+ "circuit-id": "SRG1",
+ "administrative-state": "outOfService",
+ "port-qual": "roadm-external"}]}
+ response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ time.sleep(2)
+
+ def test_19_get_notifications_alarm_service1(self):
+ data = {
+ "input": {
+ "connection-type": "service",
+ "id-consumer": "consumer",
+ "group-id": "transportpceTest"
+ }
+ }
+ response = test_utils.get_notifications_alarm_service_request(data)
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertEqual(res['output']['notification-alarm-service'][-1]['service-name'], 'service1')
+ self.assertEqual(res['output']['notification-alarm-service'][-1]['connection-type'], 'service')
+ self.assertEqual(res['output']['notification-alarm-service'][-1]['operational-state'], 'outOfService')
+ self.assertEqual(res['output']['notification-alarm-service'][-1]['message'], 'The service is now outOfService')
+ time.sleep(2)
+
+ def test_20_restore_status_port_roadma_srg(self):
+ url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
+ body = {"ports": [{
+ "port-name": "C1",
+ "logical-connection-point": "SRG1-PP1",
+ "port-type": "client",
+ "circuit-id": "SRG1",
+ "administrative-state": "inService",
+ "port-qual": "roadm-external"}]}
+ response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
+ data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+ auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ self.assertEqual(response.status_code, requests.codes.ok)
+ time.sleep(2)
+
+ def test_21_get_notifications_alarm_service1(self):
+ self.test_17_get_notifications_alarm_service1()
+
+ def test_22_delete_eth_service1(self):
response = test_utils.service_delete_request("service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
res['output']['configuration-response-common']['response-message'])
time.sleep(20)
- def test_18_get_notifications_service1(self):
+ def test_23_get_notifications_service1(self):
data = {
"input": {
"connection-type": "service",
self.assertEqual(res['output']['notification-service'][-1]['message'], 'Service deleted !')
time.sleep(2)
- def test_19_disconnect_XPDRA(self):
+ def test_24_disconnect_XPDRA(self):
response = test_utils.unmount_device("XPDR-A1")
self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- def test_20_disconnect_XPDRC(self):
+ def test_25_disconnect_XPDRC(self):
response = test_utils.unmount_device("XPDR-C1")
self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- def test_21_disconnect_ROADMA(self):
+ def test_26_disconnect_ROADMA(self):
response = test_utils.unmount_device("ROADM-A1")
self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- def test_22_disconnect_ROADMC(self):
+ def test_27_disconnect_ROADMC(self):
response = test_utils.unmount_device("ROADM-C1")
self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
URL_PORTMAPPING = "{}/config/transportpce-portmapping:network/nodes/"
URL_OPER_SERV_LIST = "{}/operational/org-openroadm-service:service-list/"
URL_GET_NBINOTIFICATIONS_SERV = "{}/operations/nbi-notifications:get-notifications-service/"
+URL_GET_NBINOTIFICATIONS_ALARM_SERV = "{}/operations/nbi-notifications:get-notifications-alarm-service/"
URL_SERV_CREATE = "{}/operations/org-openroadm-service:service-create"
URL_SERV_DELETE = "{}/operations/org-openroadm-service:service-delete"
URL_SERVICE_PATH = "{}/operations/transportpce-device-renderer:service-path"
return post_request(URL_GET_NBINOTIFICATIONS_SERV, attr)
+def get_notifications_alarm_service_request(attr):
+ return post_request(URL_GET_NBINOTIFICATIONS_ALARM_SERV, attr)
+
+
def get_service_list_request(suffix: str):
url = URL_OPER_SERV_LIST + suffix
return get_request(url)