Add new Maven module to manage NBI Notifications 70/94270/27
authorThierry Jiao <thierry.jiao@orange.com>
Thu, 17 Dec 2020 17:04:59 +0000 (18:04 +0100)
committerguillaume.lambert <guillaume.lambert@orange.com>
Thu, 11 Mar 2021 15:47:15 +0000 (16:47 +0100)
- Implement a new Maven module named nbinotifications containing classes
  capable of communicating with Kafka server
- Add the class Subscriber capable of reading events from topics Kafka
- Add the class Publisher capable of writing events from topics Kafka
- Add yang file nbi-notifications to model the service notifications
- Implement a new RPC API named GetNotificationsService capable of
  returning the notifications stored in a topic Kafka
- Implement a new RPC API named publishNotificationService in charge of
  publishing a message in Kafka topic
- Add a listener to the RPC GetNotificationsService
- Add new blueprint in nbinotifications
- Add unit tests to check the functionning of nbinotifications classes
- Update pom.xml to implement the new module nbinotifications

JIRA: TRNSPRTPCE-343
Signed-off-by: Thierry Jiao <thierry.jiao@orange.com>
Change-Id: I74298292034e4a158b27dd5db47e63918026684b

27 files changed:
api/src/main/yang/nbi-notifications@2020-11-30.yang [new file with mode: 0644]
nbinotifications/pom.xml [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/ConfigConstants.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializer.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/NbiNotificationsUtils.java [new file with mode: 0644]
nbinotifications/src/main/resources/OSGI-INF/blueprint/nobinotifications-blueprint.xml [new file with mode: 0644]
nbinotifications/src/main/resources/publisher.properties [new file with mode: 0644]
nbinotifications/src/main/resources/subscriber.properties [new file with mode: 0644]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java [new file with mode: 0644]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java [new file with mode: 0644]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java [new file with mode: 0644]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java [new file with mode: 0644]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java [new file with mode: 0644]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java [new file with mode: 0644]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java [new file with mode: 0644]
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java [new file with mode: 0644]
nbinotifications/src/test/resources/event.json [new file with mode: 0644]
nbinotifications/src/test/resources/expected_event.json [new file with mode: 0644]
nbinotifications/src/test/resources/publisher.properties [new file with mode: 0644]
nbinotifications/src/test/resources/subscriber.properties [new file with mode: 0644]
pom.xml

diff --git a/api/src/main/yang/nbi-notifications@2020-11-30.yang b/api/src/main/yang/nbi-notifications@2020-11-30.yang
new file mode 100644 (file)
index 0000000..aa068cd
--- /dev/null
@@ -0,0 +1,120 @@
+module nbi-notifications {
+  yang-version 1;
+  namespace "nbi-notifications";
+  prefix nbinotifications;
+
+  import org-openroadm-service {
+    prefix oor-service;
+    revision-date 2019-05-31;
+  }
+  import org-openroadm-common-service-types {
+    prefix org-openroadm-common-service-types;
+    revision-date 2019-05-31;
+  }
+  import org-openroadm-common-state-types {
+    prefix org-openroadm-common-state-types;
+    revision-date 2018-11-30;
+  }
+
+  organization
+    "transportPCE";
+  contact
+    "transportPCE committers - ODL";
+  description
+    "YANG definitions for using REST API in NBI notifications module. Copyright
+     (c) 2020 ORANGE and others. All rights reserved.";
+
+  revision 2020-11-30 {
+    description
+      "Initial revision of NBI notifications";
+  }
+
+  grouping notification-service {
+    leaf message {
+      type string;
+      mandatory true;
+      description
+        "Message for the specified service";
+    }
+    leaf service-name {
+      type string;
+      mandatory true;
+      description
+        "Identifier for the service to be created in the ROADM network, e.g., CLFI, CLCI, etc.
+        This is reported against the service, but may not get reflected in the service in the network.";
+    }
+    leaf common-id {
+      type string;
+      description
+        "To be used by the ROADM controller to identify the routing constraints
+        received from planning application (PED).";
+    }
+    leaf connection-type {
+      type org-openroadm-common-service-types:connection-type;
+      mandatory true;
+    }
+    container service-a-end {
+      uses org-openroadm-common-service-types:service-endpoint;
+    }
+    container service-z-end {
+      uses org-openroadm-common-service-types:service-endpoint;
+    }
+    leaf response-failed {
+      type string;
+      description
+        "Response of the error if the service request encountered an anomaly";
+    }
+    leaf operational-state {
+      type org-openroadm-common-state-types:state;
+      config false;
+      description
+        "Operational State: Actual state of service";
+    }
+  }
+
+  container notification-service {
+    description
+      "Model used to send a notification from a service request";
+    uses notification-service;
+  }
+
+  rpc get-notifications-service {
+    description "Get the notifications service send by ServiceHandler by filtering through connection type";
+    input {
+      leaf connection-type {
+        type org-openroadm-common-service-types:connection-type;
+        mandatory true;
+        description
+          "Type connection of the service ";
+      }
+      leaf id-consumer {
+        type string;
+        mandatory true;
+        description
+          "Unique ID for the consumer";
+      }
+      leaf group-id {
+        type string;
+        mandatory true;
+        description
+          "ID Group for the consumer";
+      }
+    }
+    output {
+      list notification-service {
+        uses notification-service;
+      }
+    }
+  }
+
+  notification publish-notification-service {
+    description "Publish the notifications service for topic";
+    leaf topic {
+      type string;
+      mandatory true;
+      description
+        "Topic where to send the notification service";
+     }
+     uses notification-service;
+  }
+}
diff --git a/nbinotifications/pom.xml b/nbinotifications/pom.xml
new file mode 100644 (file)
index 0000000..e49c0f1
--- /dev/null
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Copyright © 2020 Orange and others. All rights reserved. This program
+    and the accompanying materials are made available under the terms of the
+    Eclipse Public License v1.0 which accompanies this distribution, and is available
+    at http://www.eclipse.org/legal/epl-v10.html INTERNAL -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.mdsal</groupId>
+        <artifactId>binding-parent</artifactId>
+        <version>7.0.5</version>
+        <relativePath />
+    </parent>
+
+    <groupId>org.opendaylight.transportpce</groupId>
+    <artifactId>transportpce-nbinotifications</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <packaging>bundle</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <kafka.version>2.6.0</kafka.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>transportpce-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>transportpce-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}.ordmodels</groupId>
+            <artifactId>transportpce-ordmodels-service</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-binding-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-binding-dom-adapter</artifactId>
+        </dependency>
+        <!-- Testing Dependencies -->
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>test-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java
new file mode 100644 (file)
index 0000000..cb044b1
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
+import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Subscriber {
+    private static final Logger LOG = LoggerFactory.getLogger(Subscriber.class);
+
+    private final Consumer<String, NotificationService> consumer;
+
+    public Subscriber(String id, String groupId, String suscriberServer,
+            JsonStringConverter<org.opendaylight.yang.gen.v1
+                .nbi.notifications.rev201130.NotificationService> deserializer) {
+        Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
+        propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
+        propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationServiceDeserializer.class);
+        propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
+        if (suscriberServer != null && !suscriberServer.isBlank()) {
+            propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, suscriberServer);
+        }
+        LOG.info("Suscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
+        consumer = new KafkaConsumer<>(propsConsumer);
+    }
+
+    public List<NotificationService> subscribeService(String topicName) {
+        LOG.info("Subscribe request to topic '{}' ", topicName);
+        consumer.subscribe(Collections.singleton(topicName));
+        final ConsumerRecords<String, NotificationService> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+        List<NotificationService> notificationServiceList = new ArrayList<>();
+        YangInstanceIdentifier.of(NotificationService.QNAME);
+        for (ConsumerRecord<String, NotificationService> record : consumerRecords) {
+            if (record.value() != null) {
+                notificationServiceList.add(record.value());
+            }
+        }
+        LOG.info("Getting records '{}' ", notificationServiceList);
+        consumer.unsubscribe();
+        consumer.close();
+        return notificationServiceList;
+    }
+
+    @VisibleForTesting public Subscriber(Consumer<String, NotificationService> consumer) {
+        this.consumer = consumer;
+    }
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java
new file mode 100644 (file)
index 0000000..073ce49
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.impl;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NbiNotificationsImpl implements NbiNotificationsService {
+    private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class);
+    private final JsonStringConverter<org.opendaylight.yang.gen.v1
+        .nbi.notifications.rev201130.NotificationService> converter;
+    private final String server;
+
+    public NbiNotificationsImpl(JsonStringConverter<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev201130.NotificationService> converter, String server) {
+        this.converter = converter;
+        this.server = server;
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<GetNotificationsServiceOutput>> getNotificationsService(
+            GetNotificationsServiceInput input) {
+        LOG.info("RPC getNotificationsService received");
+        if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
+            LOG.warn("Missing mandatory params for input {}", input);
+            return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture();
+        }
+        Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converter);
+        List<NotificationService> notificationServiceList = subscriber
+                .subscribeService(input.getConnectionType().getName());
+        GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder()
+                .setNotificationService(notificationServiceList);
+        return RpcResultBuilder.success(output.build()).buildFuture();
+    }
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java
new file mode 100644 (file)
index 0000000..5f41034
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.opendaylight.mdsal.binding.api.NotificationService;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
+import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NbiNotificationsProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class);
+    private static Map<String, Publisher> publishersMap =  new HashMap<>();
+
+    private final RpcProviderService rpcService;
+    private ObjectRegistration<NbiNotificationsService> rpcRegistration;
+    private ListenerRegistration<NbiNotificationsListener> listenerRegistration;
+    private NotificationService notificationService;
+    private final JsonStringConverter<org.opendaylight.yang.gen.v1
+        .nbi.notifications.rev201130.NotificationService> converter;
+    private final String suscriberServer;
+
+
+    public NbiNotificationsProvider(List<String> topics,
+            String suscriberServer, String publisherServer,
+            RpcProviderService rpcProviderService, NotificationService notificationService,
+            BindingDOMCodecServices bindingDOMCodecServices) {
+        this.rpcService = rpcProviderService;
+        this.notificationService = notificationService;
+        converter =  new JsonStringConverter<>(bindingDOMCodecServices);
+        for (String topic: topics) {
+            LOG.info("Creating publisher for topic {}", topic);
+            publishersMap.put(topic, new Publisher(topic, publisherServer, converter));
+        }
+        this.suscriberServer = suscriberServer;
+    }
+
+    /**
+     * Method called when the blueprint container is created.
+     */
+    public void init() {
+        LOG.info("NbiNotificationsProvider Session Initiated");
+        rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class,
+                new NbiNotificationsImpl(converter, suscriberServer));
+        listenerRegistration = notificationService.registerNotificationListener(
+                new NbiNotificationsListenerImpl(publishersMap));
+    }
+
+    /**
+     * Method called when the blueprint container is destroyed.
+     */
+    public void close() {
+        for (Publisher publisher : publishersMap.values()) {
+            publisher.close();
+        }
+        rpcRegistration.close();
+        listenerRegistration.close();
+        LOG.info("NbiNotificationsProvider Closed");
+    }
+
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java
new file mode 100644 (file)
index 0000000..0bcdf6b
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.listener;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
+    private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsListenerImpl.class);
+    private Map<String, Publisher> publishersMap =  new HashMap<>();
+
+    public NbiNotificationsListenerImpl(Map<String, Publisher> publishersMap) {
+        this.publishersMap = publishersMap;
+    }
+
+    @Override
+    public void onPublishNotificationService(PublishNotificationService notification) {
+        LOG.info("Receiving request for publishing notification service");
+        String topic = notification.getTopic();
+        if (!publishersMap.containsKey(topic)) {
+            LOG.error("Unknown topic {}", topic);
+            return;
+        }
+        Publisher publisher = publishersMap.get(topic);
+        publisher.sendEvent(new NotificationServiceBuilder().setCommonId(notification.getCommonId())
+                .setConnectionType(notification.getConnectionType()).setMessage(notification.getMessage())
+                .setOperationalState(notification.getOperationalState())
+                .setResponseFailed(notification.getResponseFailed())
+                .setServiceAEnd(notification.getServiceAEnd()).setServiceName(notification.getServiceName())
+                .setServiceZEnd(notification.getServiceZEnd()).build());
+
+    }
+
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java
new file mode 100644 (file)
index 0000000..21295a9
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.producer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
+import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Publisher {
+    private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
+
+    private final String id;
+    private final Producer<String, NotificationService> producer;
+
+    public Publisher(String id, String publisherServer, JsonStringConverter<NotificationService> serializer) {
+        Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
+        properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
+        if (publisherServer != null && !publisherServer.isBlank()) {
+            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
+        }
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationServiceSerializer.class);
+        properties.put(ConfigConstants.CONVERTER , serializer);
+        LOG.info("Creationg publisher for id {} with properties {}", id, properties);
+        producer = new KafkaProducer<>(properties);
+        this.id = id;
+    }
+
+    @VisibleForTesting Publisher(String id, Producer<String, NotificationService> producer) {
+        this.producer = producer;
+        this.id = id;
+    }
+
+    public void close() {
+        producer.close();
+    }
+
+    public void sendEvent(NotificationService notificationService) {
+        LOG.info("SendEvent request to topic '{}' ", notificationService.getConnectionType().getName());
+        producer.send(new ProducerRecord<>(notificationService.getConnectionType().getName(), id, notificationService));
+        producer.flush();
+    }
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/ConfigConstants.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/ConfigConstants.java
new file mode 100644 (file)
index 0000000..b10b016
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+public final class ConfigConstants {
+    private ConfigConstants() {
+    }
+
+    public static final String CONVERTER = "converter";
+
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java
new file mode 100644 (file)
index 0000000..95f10fb
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationServiceDeserializer implements Deserializer<NotificationService> {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceDeserializer.class);
+    private JsonStringConverter<org.opendaylight.yang.gen.v1
+        .nbi.notifications.rev201130.NotificationService> converter;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        LOG.info("Deserializer configuration {}", configs);
+        if (configs.containsKey(ConfigConstants.CONVERTER)
+                && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+            converter = (JsonStringConverter<org.opendaylight.yang.gen.v1
+                    .nbi.notifications.rev201130.NotificationService>) configs
+                    .get(ConfigConstants.CONVERTER);
+        }
+    }
+
+    @Override
+    public NotificationService deserialize(String topic, byte[] data) {
+        if (converter == null) {
+            throw new IllegalArgumentException(
+                    "Converter should be configured through configure method of deserializer");
+        }
+        String value = new String(data, StandardCharsets.UTF_8);
+        // The message published is
+        // org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService
+        // we have to map it to
+        // org.opendaylight.yang.gen
+        // .v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService
+        org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService mappedString = converter
+                .createDataObjectFromJsonString(YangInstanceIdentifier.of(
+                        org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService.QNAME),
+                        value,
+                        JSONCodecFactorySupplier.RFC7951);
+        if (mappedString != null) {
+            LOG.info("Reading event {}", mappedString);
+            return new NotificationServiceBuilder().setCommonId(mappedString.getCommonId())
+                    .setConnectionType(mappedString.getConnectionType()).setMessage(mappedString.getMessage())
+                    .setOperationalState(mappedString.getOperationalState())
+                    .setResponseFailed(mappedString.getResponseFailed()).setServiceName(mappedString.getServiceName())
+                    .setServiceAEnd(mappedString.getServiceAEnd()).setServiceZEnd(mappedString.getServiceZEnd())
+                    .build();
+        }
+        return null;
+    }
+
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializer.java
new file mode 100644 (file)
index 0000000..eb99874
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationServiceSerializer implements Serializer<NotificationService> {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceSerializer.class);
+    private JsonStringConverter<NotificationService> converter;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        LOG.info("Deserializer configuration {}", configs);
+        if (configs.containsKey(ConfigConstants.CONVERTER)
+                && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+            converter = (JsonStringConverter<NotificationService>) configs.get(ConfigConstants.CONVERTER);
+        }
+    }
+
+    @Override
+    public byte[] serialize(String topic, NotificationService data) {
+        if (converter == null) {
+            throw new IllegalArgumentException(
+                    "Converter should be" + "configured through configure method of serializer");
+        }
+        if (data == null) {
+            return new byte[0];
+        }
+        try {
+            InstanceIdentifier<NotificationService> iid = InstanceIdentifier.builder(NotificationService.class).build();
+            String serialized = converter.createJsonStringFromDataObject(iid, data, JSONCodecFactorySupplier.RFC7951);
+            LOG.info("Serialized event {}", serialized);
+            return serialized.getBytes(StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            return new byte[0];
+        }
+    }
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/NbiNotificationsUtils.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/utils/NbiNotificationsUtils.java
new file mode 100644 (file)
index 0000000..d1a4cee
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NbiNotificationsUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsUtils.class);
+
+    private NbiNotificationsUtils() {
+    }
+
+    public static Properties loadProperties(String propertyFileName) {
+        Properties props = new Properties();
+        InputStream inputStream = NbiNotificationsUtils.class.getClassLoader()
+                .getResourceAsStream(propertyFileName);
+        try {
+            if (inputStream != null) {
+                props.load(inputStream);
+            } else {
+                LOG.warn("Kafka property file '{}' is empty", propertyFileName);
+            }
+        } catch (IOException e) {
+            LOG.warn("Kafka property file '{}' was not found in the classpath", propertyFileName);
+        }
+        return props;
+    }
+}
diff --git a/nbinotifications/src/main/resources/OSGI-INF/blueprint/nobinotifications-blueprint.xml b/nbinotifications/src/main/resources/OSGI-INF/blueprint/nobinotifications-blueprint.xml
new file mode 100644 (file)
index 0000000..9b3a3bd
--- /dev/null
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!-- Copyright © 2020 Orange and others. All rights reserved. This program and the accompanying materials
+    are made available under the terms of the Eclipse Public License v1.0 which accompanies this distribution,
+    and is available at http://www.eclipse.org/legal/epl-v10.html -->
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
+    odl:use-default-for-reference-types="true">
+    <cm:property-placeholder persistent-id="org.opendaylight.transportpce.nbinotifications" update-strategy="reload">
+        <cm:default-properties>
+            <cm:property name="suscriber.server" value="" />
+            <cm:property name="publisher.server" value="" />
+        </cm:default-properties>
+    </cm:property-placeholder>
+    <reference id="rpcService" interface="org.opendaylight.mdsal.binding.api.RpcProviderService"/>
+    <reference id="notificationService" interface="org.opendaylight.mdsal.binding.api.NotificationService"/>
+    <reference id="bindingDOMCodecServices" interface="org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices" />
+    <bean id="provider"
+        class="org.opendaylight.transportpce.nbinotifications.impl.NbiNotificationsProvider"
+        init-method="init" destroy-method="close">
+        <argument>
+            <list value-type="java.lang.String">
+                <value>PceListener</value>
+                <value>ServiceHandlerOperations</value>
+                <value>ServiceHandler</value>
+                <value>RendererListener</value>
+            </list>
+        </argument>
+        <argument value="${suscriber.server}"/>
+        <argument value="${publisher.server}"/>
+        <argument ref="rpcService" />
+        <argument ref="notificationService" />
+        <argument ref="bindingDOMCodecServices" />
+    </bean>
+</blueprint>
diff --git a/nbinotifications/src/main/resources/publisher.properties b/nbinotifications/src/main/resources/publisher.properties
new file mode 100644 (file)
index 0000000..ecbc156
--- /dev/null
@@ -0,0 +1,8 @@
+#Kafka Producer/AdminClient properties
+bootstrap.servers=localhost:9092
+acks=all
+retries=3
+max.in.flight.requests.per.connection=1
+batch.size=16384
+linger.ms=1
+buffer.memory=33554432
diff --git a/nbinotifications/src/main/resources/subscriber.properties b/nbinotifications/src/main/resources/subscriber.properties
new file mode 100644 (file)
index 0000000..0c3e96e
--- /dev/null
@@ -0,0 +1,5 @@
+#Kafka Consumer properties
+bootstrap.servers=localhost:9092
+enable.auto.commit=true
+auto.commit.interval.ms=1000
+auto.offset.reset=earliest
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java
new file mode 100644 (file)
index 0000000..de58a49
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+
+public class SubscriberTest extends AbstractTest {
+    private static final String TOPIC = "topic";
+    private static final int PARTITION = 0;
+    private MockConsumer<String, NotificationService> mockConsumer;
+    private Subscriber subscriber;
+
+    @Before
+    public void setUp() {
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        subscriber = new Subscriber(mockConsumer);
+    }
+
+    @Test
+    public void subscribeServiceShouldBeSuccessful() {
+        // from https://www.baeldung.com/kafka-mockconsumer
+        ConsumerRecord<String, NotificationService> record = new ConsumerRecord<String, NotificationService>(
+                TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedEvent());
+        mockConsumer.schedulePollTask(() -> {
+            mockConsumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
+            mockConsumer.addRecord(record);
+        });
+
+        Map<TopicPartition, Long> startOffsets = new HashMap<>();
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        startOffsets.put(tp, 0L);
+        mockConsumer.updateBeginningOffsets(startOffsets);
+        List<NotificationService> result = subscriber.subscribeService(TOPIC);
+        assertEquals("There should be 1 record", 1, result.size());
+        assertTrue("Consumer should be closed", mockConsumer.closed());
+    }
+}
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java
new file mode 100644 (file)
index 0000000..da82550
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.impl;
+
+import static org.junit.Assert.assertNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ExecutionException;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public class NbiNotificationsImplTest extends AbstractTest {
+    private NbiNotificationsImpl nbiNotificationsImpl;
+
+    @Before
+    public void setUp() {
+        JsonStringConverter<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev201130.NotificationService> converter = new JsonStringConverter<>(
+                getDataStoreContextUtil().getBindingDOMCodecServices());
+        nbiNotificationsImpl = new NbiNotificationsImpl(converter, "localhost:8080");
+    }
+
+    public void getNotificationsServiceEmptyDataTest() throws InterruptedException, ExecutionException {
+        ListenableFuture<RpcResult<GetNotificationsServiceOutput>> result =
+                nbiNotificationsImpl.getNotificationsService(new GetNotificationsServiceInputBuilder().build());
+        assertNull("Should be null", result.get().getResult().getNotificationService());
+    }
+
+    @Test
+    public void getNotificationsServiceTest() throws InterruptedException, ExecutionException {
+        GetNotificationsServiceInputBuilder builder = new GetNotificationsServiceInputBuilder();
+        builder.setGroupId("groupId");
+        builder.setIdConsumer("consumerId");
+        builder.setConnectionType(ConnectionType.Service);
+        ListenableFuture<RpcResult<GetNotificationsServiceOutput>> result =
+                nbiNotificationsImpl.getNotificationsService(builder.build());
+        assertNull("Should be null", result.get().getResult().getNotificationService());
+    }
+}
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java
new file mode 100644 (file)
index 0000000..178fc42
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.mdsal.binding.api.NotificationService;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
+import org.opendaylight.transportpce.test.AbstractTest;
+
+public class NbiNotificationsProviderTest  extends AbstractTest {
+
+    @Mock
+    RpcProviderService rpcProviderRegistry;
+
+    @Mock
+    private NotificationService notificationService;
+
+    @Before
+    public void init() {
+        MockitoAnnotations.openMocks(this);
+
+    }
+
+    @Test
+    public void initTest() {
+        NbiNotificationsProvider provider = new NbiNotificationsProvider(
+                Arrays.asList("topic1", "topic2"), "localhost:8080", "localhost:8080",
+                rpcProviderRegistry, notificationService,
+                getDataStoreContextUtil().getBindingDOMCodecServices());
+        provider.init();
+        verify(rpcProviderRegistry, times(1))
+                .registerRpcImplementation(any(), any(NbiNotificationsImpl.class));
+        verify(notificationService, times(1))
+                .registerNotificationListener(any(NbiNotificationsListenerImpl.class));
+    }
+}
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java
new file mode 100644 (file)
index 0000000..cb8faf2
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.listener;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
+
+public class NbiNotificationsListenerImplTest extends AbstractTest {
+    @Mock
+    private Publisher publisher;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+    }
+
+    @Test
+    public void onPublishNotificationServiceTest() {
+        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher));
+        PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("test")
+                .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
+                .setOperationalState(State.OutOfService).setServiceName("service name").build();
+        listener.onPublishNotificationService(notification);
+        verify(publisher, times(1)).sendEvent(any());
+    }
+
+    @Test
+    public void onPublishNotificationServiceWrongTopicTest() {
+        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher));
+        PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("wrongtopic")
+                .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
+                .setOperationalState(State.OutOfService).setServiceName("service name").build();
+        listener.onPublishNotificationService(notification);
+        verify(publisher, times(0)).sendEvent(any());
+    }
+}
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java
new file mode 100644 (file)
index 0000000..b3a5b2e
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.producer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+
+public class PublisherTest extends AbstractTest {
+    private JsonStringConverter<NotificationService> converter;
+    private Publisher publisher;
+    private MockProducer<String, NotificationService> mockProducer;
+
+    @Before
+    public void setUp() {
+        NotificationServiceSerializer serializer = new NotificationServiceSerializer();
+        Map<String, Object> properties = Map.of(ConfigConstants.CONVERTER , serializer);
+        serializer.configure(properties, false);
+        mockProducer =  new MockProducer<>(true, new StringSerializer(), serializer);
+        converter = new JsonStringConverter<NotificationService>(
+                getDataStoreContextUtil().getBindingDOMCodecServices());
+        publisher = new Publisher("test",mockProducer);
+    }
+
+    @Test
+    public void sendEventShouldBeSuccessful() throws IOException {
+        String json = Files.readString(Paths.get("src/test/resources/event.json"));
+        NotificationService notificationService = converter
+                .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationService.QNAME),
+                        json, JSONCodecFactorySupplier.RFC7951);
+        publisher.sendEvent(notificationService);
+        assertEquals("We should have one message", 1, mockProducer.history().size());
+        assertEquals("Key should be test", "test",mockProducer.history().get(0).key());
+    }
+
+}
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java
new file mode 100644 (file)
index 0000000..928c247
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+
+public class NotificationServiceDeserializerTest extends AbstractTest {
+
+    @Test
+    public void deserializeTest() throws IOException {
+        JsonStringConverter<org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService> converter =
+                new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+        NotificationServiceDeserializer deserializer = new NotificationServiceDeserializer();
+        Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
+        deserializer.configure(configs, false);
+        NotificationService readEvent = deserializer.deserialize("Test",
+                Files.readAllBytes(Paths.get("src/test/resources/event.json")));
+        deserializer.close();
+        assertEquals("Service name should be service1", "service1", readEvent.getServiceName());
+    }
+}
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java
new file mode 100644 (file)
index 0000000..3fe658b
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+
+public class NotificationServiceSerializerTest extends AbstractTest {
+
+    @Test
+    public void serializeTest() throws IOException {
+        JsonStringConverter<NotificationService> converter =
+                new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+        String json = Files.readString(Paths.get("src/test/resources/event.json"));
+        NotificationService notificationService = converter
+                .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationService.QNAME),
+                        json, JSONCodecFactorySupplier.RFC7951);
+        NotificationServiceSerializer serializer = new NotificationServiceSerializer();
+        Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
+        serializer.configure(configs, false);
+        byte[] data = serializer.serialize("test", notificationService);
+        serializer.close();
+        assertNotNull("Serialized data should not be null", data);
+        String expectedJson = Files.readString(Paths.get("src/test/resources/expected_event.json"));
+        assertEquals("The event should be equals", expectedJson, new String(data, StandardCharsets.UTF_8));
+    }
+}
diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java
new file mode 100644 (file)
index 0000000..44b6958
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2020 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.utils;
+
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.node.types.rev181130.NodeIdType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.RxDirection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.lgx.LgxBuilder;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.PortBuilder;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
+
+public final class NotificationServiceDataUtils {
+
+    private NotificationServiceDataUtils() {
+    }
+
+    public static NotificationService buildSendEventInput() {
+        NotificationServiceBuilder notificationServiceBuilder = new NotificationServiceBuilder()
+                .setMessage("message")
+                .setServiceName("service1")
+                .setOperationalState(State.InService)
+                .setResponseFailed("")
+                .setCommonId("commond-id")
+                .setConnectionType(ConnectionType.Service)
+                .setServiceZEnd(getServiceZEndBuild().build())
+                .setServiceAEnd(getServiceAEndBuild().build());
+
+        return notificationServiceBuilder.build();
+    }
+
+    public static org.opendaylight.yang.gen.v1
+        .nbi.notifications.rev201130.get.notifications.service.output.NotificationService buildReceivedEvent() {
+        org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder
+            notificationServiceBuilder = new org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder()
+                .setMessage("message")
+                .setServiceName("service1")
+                .setOperationalState(State.InService)
+                .setResponseFailed("")
+                .setCommonId("commond-id")
+                .setConnectionType(ConnectionType.Service)
+                .setServiceZEnd(getServiceZEndBuild().build())
+                .setServiceAEnd(getServiceAEndBuild().build());
+
+        return notificationServiceBuilder.build();
+    }
+
+    public static ServiceAEndBuilder getServiceAEndBuild() {
+        return new ServiceAEndBuilder()
+                .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
+                .setNodeId(new NodeIdType("XPONDER-1-2"))
+                .setTxDirection(getTxDirection())
+                .setRxDirection(getRxDirection());
+    }
+
+    public static ServiceZEndBuilder getServiceZEndBuild() {
+        return new ServiceZEndBuilder()
+                .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
+                .setNodeId(new NodeIdType("XPONDER-1-2"))
+                .setTxDirection(getTxDirection())
+                .setRxDirection(getRxDirection());
+    }
+
+    private static TxDirection getTxDirection() {
+        return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+                .endpoint.TxDirectionBuilder().setPort(new PortBuilder().setPortDeviceName("device name")
+                .setPortName("port name").setPortRack("port rack").setPortShelf("port shelf")
+                .setPortSlot("port slot").setPortSubSlot("port subslot").setPortType("port type").build())
+                .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name").setLgxPortName("lgx port name")
+                        .setLgxPortRack("lgx port rack").setLgxPortShelf("lgx port shelf").build())
+                .build();
+    }
+
+    private static RxDirection getRxDirection() {
+        return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+                .endpoint.RxDirectionBuilder()
+                .setPort(new PortBuilder().setPortDeviceName("device name").setPortName("port name")
+                        .setPortRack("port rack").setPortShelf("port shelf").setPortSlot("port slot")
+                        .setPortSubSlot("port subslot").setPortType("port type").build())
+                .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name")
+                        .setLgxPortName("lgx port name").setLgxPortRack("lgx port rack")
+                        .setLgxPortShelf("lgx port shelf").build())
+                .build();
+    }
+}
diff --git a/nbinotifications/src/test/resources/event.json b/nbinotifications/src/test/resources/event.json
new file mode 100644 (file)
index 0000000..63c7bec
--- /dev/null
@@ -0,0 +1,90 @@
+{
+    "notification-service": {
+        "service-name": "service1",
+        "service-a-end": {
+            "service-format": "OC",
+            "node-id": "XPONDER-1-2",
+            "service-rate": 1,
+            "clli": "clli",
+            "tx-direction": {
+                "port": {
+                    "port-slot": "port slot",
+                    "port-device-name": "device name",
+                    "port-rack": "port rack",
+                    "port-shelf": "port shelf",
+                    "port-type": "port type",
+                    "port-sub-slot": "port subslot",
+                    "port-name": "port name"
+                },
+                "lgx": {
+                    "lgx-port-name": "lgx port name",
+                    "lgx-port-shelf": "lgx port shelf",
+                    "lgx-port-rack": "lgx port rack",
+                    "lgx-device-name": "lgx device name"
+                }
+            },
+            "rx-direction": {
+                "port": {
+                    "port-slot": "port slot",
+                    "port-device-name": "device name",
+                    "port-rack": "port rack",
+                    "port-shelf": "port shelf",
+                    "port-type": "port type",
+                    "port-sub-slot": "port subslot",
+                    "port-name": "port name"
+                },
+                "lgx": {
+                    "lgx-port-name": "lgx port name",
+                    "lgx-port-shelf": "lgx port shelf",
+                    "lgx-port-rack": "lgx port rack",
+                    "lgx-device-name": "lgx device name"
+                }
+            }
+        },
+        "common-id": "commond-id",
+        "operational-state": "inService",
+        "connection-type": "service",
+        "service-z-end": {
+            "service-format": "OC",
+            "node-id": "XPONDER-1-2",
+            "service-rate": 1,
+            "clli": "clli",
+            "tx-direction": {
+                "port": {
+                    "port-slot": "port slot",
+                    "port-device-name": "device name",
+                    "port-rack": "port rack",
+                    "port-shelf": "port shelf",
+                    "port-type": "port type",
+                    "port-sub-slot": "port subslot",
+                    "port-name": "port name"
+                },
+                "lgx": {
+                    "lgx-port-name": "lgx port name",
+                    "lgx-port-shelf": "lgx port shelf",
+                    "lgx-port-rack": "lgx port rack",
+                    "lgx-device-name": "lgx device name"
+                }
+            },
+            "rx-direction": {
+                "port": {
+                    "port-slot": "port slot",
+                    "port-device-name": "device name",
+                    "port-rack": "port rack",
+                    "port-shelf": "port shelf",
+                    "port-type": "port type",
+                    "port-sub-slot": "port subslot",
+                    "port-name": "port name"
+                },
+                "lgx": {
+                    "lgx-port-name": "lgx port name",
+                    "lgx-port-shelf": "lgx port shelf",
+                    "lgx-port-rack": "lgx port rack",
+                    "lgx-device-name": "lgx device name"
+                }
+            }
+        },
+        "message": "message",
+        "response-failed": ""
+    }
+}
diff --git a/nbinotifications/src/test/resources/expected_event.json b/nbinotifications/src/test/resources/expected_event.json
new file mode 100644 (file)
index 0000000..2b8924d
--- /dev/null
@@ -0,0 +1 @@
+{"notification-service":{"service-name":"service1","service-a-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"common-id":"commond-id","operational-state":"inService","connection-type":"service","service-z-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"message":"message","response-failed":""}}
\ No newline at end of file
diff --git a/nbinotifications/src/test/resources/publisher.properties b/nbinotifications/src/test/resources/publisher.properties
new file mode 100644 (file)
index 0000000..ecbc156
--- /dev/null
@@ -0,0 +1,8 @@
+#Kafka Producer/AdminClient properties
+bootstrap.servers=localhost:9092
+acks=all
+retries=3
+max.in.flight.requests.per.connection=1
+batch.size=16384
+linger.ms=1
+buffer.memory=33554432
diff --git a/nbinotifications/src/test/resources/subscriber.properties b/nbinotifications/src/test/resources/subscriber.properties
new file mode 100644 (file)
index 0000000..0c3e96e
--- /dev/null
@@ -0,0 +1,5 @@
+#Kafka Consumer properties
+bootstrap.servers=localhost:9092
+enable.auto.commit=true
+auto.commit.interval.ms=1000
+auto.offset.reset=earliest
diff --git a/pom.xml b/pom.xml
index 7d26a84342d4e27c0efadb078e1794bb9afe1849..f8f8eea917ddbfa2cd50bf5e08be10e6a273ab4c 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
         <module>pce</module>
         <module>servicehandler</module>
         <module>tapi</module>
+        <module>nbinotifications</module>
         <module>features</module>
         <module>karaf</module>
     </modules>