--- /dev/null
+module nbi-notifications {
+ yang-version 1;
+ namespace "nbi-notifications";
+ prefix nbinotifications;
+
+ import org-openroadm-service {
+ prefix oor-service;
+ revision-date 2019-05-31;
+ }
+ import org-openroadm-common-service-types {
+ prefix org-openroadm-common-service-types;
+ revision-date 2019-05-31;
+ }
+ import org-openroadm-common-state-types {
+ prefix org-openroadm-common-state-types;
+ revision-date 2018-11-30;
+ }
+
+ organization
+ "transportPCE";
+ contact
+ "transportPCE committers - ODL";
+ description
+ "YANG definitions for using REST API in NBI notifications module. Copyright
+ (c) 2020 ORANGE and others. All rights reserved.";
+
+ revision 2020-11-30 {
+ description
+ "Initial revision of NBI notifications";
+ }
+
+ grouping notification-service {
+ leaf message {
+ type string;
+ mandatory true;
+ description
+ "Message for the specified service";
+ }
+ leaf service-name {
+ type string;
+ mandatory true;
+ description
+ "Identifier for the service to be created in the ROADM network, e.g., CLFI, CLCI, etc.
+ This is reported against the service, but may not get reflected in the service in the network.";
+ }
+ leaf common-id {
+ type string;
+ description
+ "To be used by the ROADM controller to identify the routing constraints
+ received from planning application (PED).";
+ }
+ leaf connection-type {
+ type org-openroadm-common-service-types:connection-type;
+ mandatory true;
+ }
+ container service-a-end {
+ uses org-openroadm-common-service-types:service-endpoint;
+ }
+ container service-z-end {
+ uses org-openroadm-common-service-types:service-endpoint;
+ }
+ leaf response-failed {
+ type string;
+ description
+ "Response of the error if the service request encountered an anomaly";
+ }
+ leaf operational-state {
+ type org-openroadm-common-state-types:state;
+ config false;
+ description
+ "Operational State: Actual state of service";
+ }
+ }
+
+ container notification-service {
+ description
+ "Model used to send a notification from a service request";
+ uses notification-service;
+ }
+
+ rpc get-notifications-service {
+ description "Get the notifications service send by ServiceHandler by filtering through connection type";
+ input {
+ leaf connection-type {
+ type org-openroadm-common-service-types:connection-type;
+ mandatory true;
+ description
+ "Type connection of the service ";
+ }
+ leaf id-consumer {
+ type string;
+ mandatory true;
+ description
+ "Unique ID for the consumer";
+ }
+ leaf group-id {
+ type string;
+ mandatory true;
+ description
+ "ID Group for the consumer";
+ }
+ }
+ output {
+ list notification-service {
+ uses notification-service;
+ }
+ }
+ }
+
+ notification publish-notification-service {
+ description "Publish the notifications service for topic";
+ leaf topic {
+ type string;
+ mandatory true;
+ description
+ "Topic where to send the notification service";
+ }
+ uses notification-service;
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Copyright © 2020 Orange and others. All rights reserved. This program
+ and the accompanying materials are made available under the terms of the
+ Eclipse Public License v1.0 which accompanies this distribution, and is available
+ at http://www.eclipse.org/legal/epl-v10.html INTERNAL -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>binding-parent</artifactId>
+ <version>7.0.5</version>
+ <relativePath />
+ </parent>
+
+ <groupId>org.opendaylight.transportpce</groupId>
+ <artifactId>transportpce-nbinotifications</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <packaging>bundle</packaging>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <kafka.version>2.6.0</kafka.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>transportpce-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>transportpce-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}.ordmodels</groupId>
+ <artifactId>transportpce-ordmodels-service</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-binding-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-binding-dom-adapter</artifactId>
+ </dependency>
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>test-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
+import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Subscriber {
+ private static final Logger LOG = LoggerFactory.getLogger(Subscriber.class);
+
+ private final Consumer<String, NotificationService> consumer;
+
+ public Subscriber(String id, String groupId, String suscriberServer,
+ JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.NotificationService> deserializer) {
+ Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
+ propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
+ propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationServiceDeserializer.class);
+ propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
+ if (suscriberServer != null && !suscriberServer.isBlank()) {
+ propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, suscriberServer);
+ }
+ LOG.info("Suscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
+ consumer = new KafkaConsumer<>(propsConsumer);
+ }
+
+ public List<NotificationService> subscribeService(String topicName) {
+ LOG.info("Subscribe request to topic '{}' ", topicName);
+ consumer.subscribe(Collections.singleton(topicName));
+ final ConsumerRecords<String, NotificationService> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+ List<NotificationService> notificationServiceList = new ArrayList<>();
+ YangInstanceIdentifier.of(NotificationService.QNAME);
+ for (ConsumerRecord<String, NotificationService> record : consumerRecords) {
+ if (record.value() != null) {
+ notificationServiceList.add(record.value());
+ }
+ }
+ LOG.info("Getting records '{}' ", notificationServiceList);
+ consumer.unsubscribe();
+ consumer.close();
+ return notificationServiceList;
+ }
+
+ @VisibleForTesting public Subscriber(Consumer<String, NotificationService> consumer) {
+ this.consumer = consumer;
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.impl;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NbiNotificationsImpl implements NbiNotificationsService {
+ private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class);
+ private final JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.NotificationService> converter;
+ private final String server;
+
+ public NbiNotificationsImpl(JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.NotificationService> converter, String server) {
+ this.converter = converter;
+ this.server = server;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<GetNotificationsServiceOutput>> getNotificationsService(
+ GetNotificationsServiceInput input) {
+ LOG.info("RPC getNotificationsService received");
+ if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
+ LOG.warn("Missing mandatory params for input {}", input);
+ return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture();
+ }
+ Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converter);
+ List<NotificationService> notificationServiceList = subscriber
+ .subscribeService(input.getConnectionType().getName());
+ GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder()
+ .setNotificationService(notificationServiceList);
+ return RpcResultBuilder.success(output.build()).buildFuture();
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.opendaylight.mdsal.binding.api.NotificationService;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
+import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NbiNotificationsProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class);
+ private static Map<String, Publisher> publishersMap = new HashMap<>();
+
+ private final RpcProviderService rpcService;
+ private ObjectRegistration<NbiNotificationsService> rpcRegistration;
+ private ListenerRegistration<NbiNotificationsListener> listenerRegistration;
+ private NotificationService notificationService;
+ private final JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.NotificationService> converter;
+ private final String suscriberServer;
+
+
+ public NbiNotificationsProvider(List<String> topics,
+ String suscriberServer, String publisherServer,
+ RpcProviderService rpcProviderService, NotificationService notificationService,
+ BindingDOMCodecServices bindingDOMCodecServices) {
+ this.rpcService = rpcProviderService;
+ this.notificationService = notificationService;
+ converter = new JsonStringConverter<>(bindingDOMCodecServices);
+ for (String topic: topics) {
+ LOG.info("Creating publisher for topic {}", topic);
+ publishersMap.put(topic, new Publisher(topic, publisherServer, converter));
+ }
+ this.suscriberServer = suscriberServer;
+ }
+
+ /**
+ * Method called when the blueprint container is created.
+ */
+ public void init() {
+ LOG.info("NbiNotificationsProvider Session Initiated");
+ rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class,
+ new NbiNotificationsImpl(converter, suscriberServer));
+ listenerRegistration = notificationService.registerNotificationListener(
+ new NbiNotificationsListenerImpl(publishersMap));
+ }
+
+ /**
+ * Method called when the blueprint container is destroyed.
+ */
+ public void close() {
+ for (Publisher publisher : publishersMap.values()) {
+ publisher.close();
+ }
+ rpcRegistration.close();
+ listenerRegistration.close();
+ LOG.info("NbiNotificationsProvider Closed");
+ }
+
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.listener;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
+ private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsListenerImpl.class);
+ private Map<String, Publisher> publishersMap = new HashMap<>();
+
+ public NbiNotificationsListenerImpl(Map<String, Publisher> publishersMap) {
+ this.publishersMap = publishersMap;
+ }
+
+ @Override
+ public void onPublishNotificationService(PublishNotificationService notification) {
+ LOG.info("Receiving request for publishing notification service");
+ String topic = notification.getTopic();
+ if (!publishersMap.containsKey(topic)) {
+ LOG.error("Unknown topic {}", topic);
+ return;
+ }
+ Publisher publisher = publishersMap.get(topic);
+ publisher.sendEvent(new NotificationServiceBuilder().setCommonId(notification.getCommonId())
+ .setConnectionType(notification.getConnectionType()).setMessage(notification.getMessage())
+ .setOperationalState(notification.getOperationalState())
+ .setResponseFailed(notification.getResponseFailed())
+ .setServiceAEnd(notification.getServiceAEnd()).setServiceName(notification.getServiceName())
+ .setServiceZEnd(notification.getServiceZEnd()).build());
+
+ }
+
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.producer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
+import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Publisher {
+ private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
+
+ private final String id;
+ private final Producer<String, NotificationService> producer;
+
+ public Publisher(String id, String publisherServer, JsonStringConverter<NotificationService> serializer) {
+ Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
+ if (publisherServer != null && !publisherServer.isBlank()) {
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
+ }
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationServiceSerializer.class);
+ properties.put(ConfigConstants.CONVERTER , serializer);
+ LOG.info("Creationg publisher for id {} with properties {}", id, properties);
+ producer = new KafkaProducer<>(properties);
+ this.id = id;
+ }
+
+ @VisibleForTesting Publisher(String id, Producer<String, NotificationService> producer) {
+ this.producer = producer;
+ this.id = id;
+ }
+
+ public void close() {
+ producer.close();
+ }
+
+ public void sendEvent(NotificationService notificationService) {
+ LOG.info("SendEvent request to topic '{}' ", notificationService.getConnectionType().getName());
+ producer.send(new ProducerRecord<>(notificationService.getConnectionType().getName(), id, notificationService));
+ producer.flush();
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+public final class ConfigConstants {
+ private ConfigConstants() {
+ }
+
+ public static final String CONVERTER = "converter";
+
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationServiceDeserializer implements Deserializer<NotificationService> {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceDeserializer.class);
+ private JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.NotificationService> converter;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ LOG.info("Deserializer configuration {}", configs);
+ if (configs.containsKey(ConfigConstants.CONVERTER)
+ && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+ converter = (JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.NotificationService>) configs
+ .get(ConfigConstants.CONVERTER);
+ }
+ }
+
+ @Override
+ public NotificationService deserialize(String topic, byte[] data) {
+ if (converter == null) {
+ throw new IllegalArgumentException(
+ "Converter should be configured through configure method of deserializer");
+ }
+ String value = new String(data, StandardCharsets.UTF_8);
+ // The message published is
+ // org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService
+ // we have to map it to
+ // org.opendaylight.yang.gen
+ // .v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService
+ org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService mappedString = converter
+ .createDataObjectFromJsonString(YangInstanceIdentifier.of(
+ org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService.QNAME),
+ value,
+ JSONCodecFactorySupplier.RFC7951);
+ if (mappedString != null) {
+ LOG.info("Reading event {}", mappedString);
+ return new NotificationServiceBuilder().setCommonId(mappedString.getCommonId())
+ .setConnectionType(mappedString.getConnectionType()).setMessage(mappedString.getMessage())
+ .setOperationalState(mappedString.getOperationalState())
+ .setResponseFailed(mappedString.getResponseFailed()).setServiceName(mappedString.getServiceName())
+ .setServiceAEnd(mappedString.getServiceAEnd()).setServiceZEnd(mappedString.getServiceZEnd())
+ .build();
+ }
+ return null;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationServiceSerializer implements Serializer<NotificationService> {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceSerializer.class);
+ private JsonStringConverter<NotificationService> converter;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ LOG.info("Deserializer configuration {}", configs);
+ if (configs.containsKey(ConfigConstants.CONVERTER)
+ && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+ converter = (JsonStringConverter<NotificationService>) configs.get(ConfigConstants.CONVERTER);
+ }
+ }
+
+ @Override
+ public byte[] serialize(String topic, NotificationService data) {
+ if (converter == null) {
+ throw new IllegalArgumentException(
+ "Converter should be" + "configured through configure method of serializer");
+ }
+ if (data == null) {
+ return new byte[0];
+ }
+ try {
+ InstanceIdentifier<NotificationService> iid = InstanceIdentifier.builder(NotificationService.class).build();
+ String serialized = converter.createJsonStringFromDataObject(iid, data, JSONCodecFactorySupplier.RFC7951);
+ LOG.info("Serialized event {}", serialized);
+ return serialized.getBytes(StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ return new byte[0];
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NbiNotificationsUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsUtils.class);
+
+ private NbiNotificationsUtils() {
+ }
+
+ public static Properties loadProperties(String propertyFileName) {
+ Properties props = new Properties();
+ InputStream inputStream = NbiNotificationsUtils.class.getClassLoader()
+ .getResourceAsStream(propertyFileName);
+ try {
+ if (inputStream != null) {
+ props.load(inputStream);
+ } else {
+ LOG.warn("Kafka property file '{}' is empty", propertyFileName);
+ }
+ } catch (IOException e) {
+ LOG.warn("Kafka property file '{}' was not found in the classpath", propertyFileName);
+ }
+ return props;
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!-- Copyright © 2020 Orange and others. All rights reserved. This program and the accompanying materials
+ are made available under the terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html -->
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
+ odl:use-default-for-reference-types="true">
+ <cm:property-placeholder persistent-id="org.opendaylight.transportpce.nbinotifications" update-strategy="reload">
+ <cm:default-properties>
+ <cm:property name="suscriber.server" value="" />
+ <cm:property name="publisher.server" value="" />
+ </cm:default-properties>
+ </cm:property-placeholder>
+ <reference id="rpcService" interface="org.opendaylight.mdsal.binding.api.RpcProviderService"/>
+ <reference id="notificationService" interface="org.opendaylight.mdsal.binding.api.NotificationService"/>
+ <reference id="bindingDOMCodecServices" interface="org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices" />
+ <bean id="provider"
+ class="org.opendaylight.transportpce.nbinotifications.impl.NbiNotificationsProvider"
+ init-method="init" destroy-method="close">
+ <argument>
+ <list value-type="java.lang.String">
+ <value>PceListener</value>
+ <value>ServiceHandlerOperations</value>
+ <value>ServiceHandler</value>
+ <value>RendererListener</value>
+ </list>
+ </argument>
+ <argument value="${suscriber.server}"/>
+ <argument value="${publisher.server}"/>
+ <argument ref="rpcService" />
+ <argument ref="notificationService" />
+ <argument ref="bindingDOMCodecServices" />
+ </bean>
+</blueprint>
--- /dev/null
+#Kafka Producer/AdminClient properties
+bootstrap.servers=localhost:9092
+acks=all
+retries=3
+max.in.flight.requests.per.connection=1
+batch.size=16384
+linger.ms=1
+buffer.memory=33554432
--- /dev/null
+#Kafka Consumer properties
+bootstrap.servers=localhost:9092
+enable.auto.commit=true
+auto.commit.interval.ms=1000
+auto.offset.reset=earliest
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+
+public class SubscriberTest extends AbstractTest {
+ private static final String TOPIC = "topic";
+ private static final int PARTITION = 0;
+ private MockConsumer<String, NotificationService> mockConsumer;
+ private Subscriber subscriber;
+
+ @Before
+ public void setUp() {
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ subscriber = new Subscriber(mockConsumer);
+ }
+
+ @Test
+ public void subscribeServiceShouldBeSuccessful() {
+ // from https://www.baeldung.com/kafka-mockconsumer
+ ConsumerRecord<String, NotificationService> record = new ConsumerRecord<String, NotificationService>(
+ TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedEvent());
+ mockConsumer.schedulePollTask(() -> {
+ mockConsumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
+ mockConsumer.addRecord(record);
+ });
+
+ Map<TopicPartition, Long> startOffsets = new HashMap<>();
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ startOffsets.put(tp, 0L);
+ mockConsumer.updateBeginningOffsets(startOffsets);
+ List<NotificationService> result = subscriber.subscribeService(TOPIC);
+ assertEquals("There should be 1 record", 1, result.size());
+ assertTrue("Consumer should be closed", mockConsumer.closed());
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.impl;
+
+import static org.junit.Assert.assertNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ExecutionException;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public class NbiNotificationsImplTest extends AbstractTest {
+ private NbiNotificationsImpl nbiNotificationsImpl;
+
+ @Before
+ public void setUp() {
+ JsonStringConverter<org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.NotificationService> converter = new JsonStringConverter<>(
+ getDataStoreContextUtil().getBindingDOMCodecServices());
+ nbiNotificationsImpl = new NbiNotificationsImpl(converter, "localhost:8080");
+ }
+
+ public void getNotificationsServiceEmptyDataTest() throws InterruptedException, ExecutionException {
+ ListenableFuture<RpcResult<GetNotificationsServiceOutput>> result =
+ nbiNotificationsImpl.getNotificationsService(new GetNotificationsServiceInputBuilder().build());
+ assertNull("Should be null", result.get().getResult().getNotificationService());
+ }
+
+ @Test
+ public void getNotificationsServiceTest() throws InterruptedException, ExecutionException {
+ GetNotificationsServiceInputBuilder builder = new GetNotificationsServiceInputBuilder();
+ builder.setGroupId("groupId");
+ builder.setIdConsumer("consumerId");
+ builder.setConnectionType(ConnectionType.Service);
+ ListenableFuture<RpcResult<GetNotificationsServiceOutput>> result =
+ nbiNotificationsImpl.getNotificationsService(builder.build());
+ assertNull("Should be null", result.get().getResult().getNotificationService());
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.mdsal.binding.api.NotificationService;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
+import org.opendaylight.transportpce.test.AbstractTest;
+
+public class NbiNotificationsProviderTest extends AbstractTest {
+
+ @Mock
+ RpcProviderService rpcProviderRegistry;
+
+ @Mock
+ private NotificationService notificationService;
+
+ @Before
+ public void init() {
+ MockitoAnnotations.openMocks(this);
+
+ }
+
+ @Test
+ public void initTest() {
+ NbiNotificationsProvider provider = new NbiNotificationsProvider(
+ Arrays.asList("topic1", "topic2"), "localhost:8080", "localhost:8080",
+ rpcProviderRegistry, notificationService,
+ getDataStoreContextUtil().getBindingDOMCodecServices());
+ provider.init();
+ verify(rpcProviderRegistry, times(1))
+ .registerRpcImplementation(any(), any(NbiNotificationsImpl.class));
+ verify(notificationService, times(1))
+ .registerNotificationListener(any(NbiNotificationsListenerImpl.class));
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.listener;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
+
+public class NbiNotificationsListenerImplTest extends AbstractTest {
+ @Mock
+ private Publisher publisher;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ }
+
+ @Test
+ public void onPublishNotificationServiceTest() {
+ NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher));
+ PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("test")
+ .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
+ .setOperationalState(State.OutOfService).setServiceName("service name").build();
+ listener.onPublishNotificationService(notification);
+ verify(publisher, times(1)).sendEvent(any());
+ }
+
+ @Test
+ public void onPublishNotificationServiceWrongTopicTest() {
+ NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher));
+ PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("wrongtopic")
+ .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
+ .setOperationalState(State.OutOfService).setServiceName("service name").build();
+ listener.onPublishNotificationService(notification);
+ verify(publisher, times(0)).sendEvent(any());
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.producer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+
+public class PublisherTest extends AbstractTest {
+ private JsonStringConverter<NotificationService> converter;
+ private Publisher publisher;
+ private MockProducer<String, NotificationService> mockProducer;
+
+ @Before
+ public void setUp() {
+ NotificationServiceSerializer serializer = new NotificationServiceSerializer();
+ Map<String, Object> properties = Map.of(ConfigConstants.CONVERTER , serializer);
+ serializer.configure(properties, false);
+ mockProducer = new MockProducer<>(true, new StringSerializer(), serializer);
+ converter = new JsonStringConverter<NotificationService>(
+ getDataStoreContextUtil().getBindingDOMCodecServices());
+ publisher = new Publisher("test",mockProducer);
+ }
+
+ @Test
+ public void sendEventShouldBeSuccessful() throws IOException {
+ String json = Files.readString(Paths.get("src/test/resources/event.json"));
+ NotificationService notificationService = converter
+ .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationService.QNAME),
+ json, JSONCodecFactorySupplier.RFC7951);
+ publisher.sendEvent(notificationService);
+ assertEquals("We should have one message", 1, mockProducer.history().size());
+ assertEquals("Key should be test", "test",mockProducer.history().get(0).key());
+ }
+
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+
+public class NotificationServiceDeserializerTest extends AbstractTest {
+
+ @Test
+ public void deserializeTest() throws IOException {
+ JsonStringConverter<org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService> converter =
+ new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+ NotificationServiceDeserializer deserializer = new NotificationServiceDeserializer();
+ Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
+ deserializer.configure(configs, false);
+ NotificationService readEvent = deserializer.deserialize("Test",
+ Files.readAllBytes(Paths.get("src/test/resources/event.json")));
+ deserializer.close();
+ assertEquals("Service name should be service1", "service1", readEvent.getServiceName());
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2021 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.test.AbstractTest;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+
+public class NotificationServiceSerializerTest extends AbstractTest {
+
+ @Test
+ public void serializeTest() throws IOException {
+ JsonStringConverter<NotificationService> converter =
+ new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
+ String json = Files.readString(Paths.get("src/test/resources/event.json"));
+ NotificationService notificationService = converter
+ .createDataObjectFromJsonString(YangInstanceIdentifier.of(NotificationService.QNAME),
+ json, JSONCodecFactorySupplier.RFC7951);
+ NotificationServiceSerializer serializer = new NotificationServiceSerializer();
+ Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
+ serializer.configure(configs, false);
+ byte[] data = serializer.serialize("test", notificationService);
+ serializer.close();
+ assertNotNull("Serialized data should not be null", data);
+ String expectedJson = Files.readString(Paths.get("src/test/resources/expected_event.json"));
+ assertEquals("The event should be equals", expectedJson, new String(data, StandardCharsets.UTF_8));
+ }
+}
--- /dev/null
+/*
+ * Copyright © 2020 Orange, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.utils;
+
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.node.types.rev181130.NodeIdType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.RxDirection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.lgx.LgxBuilder;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.PortBuilder;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
+
+public final class NotificationServiceDataUtils {
+
+ private NotificationServiceDataUtils() {
+ }
+
+ public static NotificationService buildSendEventInput() {
+ NotificationServiceBuilder notificationServiceBuilder = new NotificationServiceBuilder()
+ .setMessage("message")
+ .setServiceName("service1")
+ .setOperationalState(State.InService)
+ .setResponseFailed("")
+ .setCommonId("commond-id")
+ .setConnectionType(ConnectionType.Service)
+ .setServiceZEnd(getServiceZEndBuild().build())
+ .setServiceAEnd(getServiceAEndBuild().build());
+
+ return notificationServiceBuilder.build();
+ }
+
+ public static org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.get.notifications.service.output.NotificationService buildReceivedEvent() {
+ org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder
+ notificationServiceBuilder = new org.opendaylight.yang.gen.v1
+ .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder()
+ .setMessage("message")
+ .setServiceName("service1")
+ .setOperationalState(State.InService)
+ .setResponseFailed("")
+ .setCommonId("commond-id")
+ .setConnectionType(ConnectionType.Service)
+ .setServiceZEnd(getServiceZEndBuild().build())
+ .setServiceAEnd(getServiceAEndBuild().build());
+
+ return notificationServiceBuilder.build();
+ }
+
+ public static ServiceAEndBuilder getServiceAEndBuild() {
+ return new ServiceAEndBuilder()
+ .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
+ .setNodeId(new NodeIdType("XPONDER-1-2"))
+ .setTxDirection(getTxDirection())
+ .setRxDirection(getRxDirection());
+ }
+
+ public static ServiceZEndBuilder getServiceZEndBuild() {
+ return new ServiceZEndBuilder()
+ .setClli("clli").setServiceFormat(ServiceFormat.OC).setServiceRate(Uint32.valueOf(1))
+ .setNodeId(new NodeIdType("XPONDER-1-2"))
+ .setTxDirection(getTxDirection())
+ .setRxDirection(getRxDirection());
+ }
+
+ private static TxDirection getTxDirection() {
+ return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+ .endpoint.TxDirectionBuilder().setPort(new PortBuilder().setPortDeviceName("device name")
+ .setPortName("port name").setPortRack("port rack").setPortShelf("port shelf")
+ .setPortSlot("port slot").setPortSubSlot("port subslot").setPortType("port type").build())
+ .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name").setLgxPortName("lgx port name")
+ .setLgxPortRack("lgx port rack").setLgxPortShelf("lgx port shelf").build())
+ .build();
+ }
+
+ private static RxDirection getRxDirection() {
+ return new org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service
+ .endpoint.RxDirectionBuilder()
+ .setPort(new PortBuilder().setPortDeviceName("device name").setPortName("port name")
+ .setPortRack("port rack").setPortShelf("port shelf").setPortSlot("port slot")
+ .setPortSubSlot("port subslot").setPortType("port type").build())
+ .setLgx(new LgxBuilder().setLgxDeviceName("lgx device name")
+ .setLgxPortName("lgx port name").setLgxPortRack("lgx port rack")
+ .setLgxPortShelf("lgx port shelf").build())
+ .build();
+ }
+}
--- /dev/null
+{
+ "notification-service": {
+ "service-name": "service1",
+ "service-a-end": {
+ "service-format": "OC",
+ "node-id": "XPONDER-1-2",
+ "service-rate": 1,
+ "clli": "clli",
+ "tx-direction": {
+ "port": {
+ "port-slot": "port slot",
+ "port-device-name": "device name",
+ "port-rack": "port rack",
+ "port-shelf": "port shelf",
+ "port-type": "port type",
+ "port-sub-slot": "port subslot",
+ "port-name": "port name"
+ },
+ "lgx": {
+ "lgx-port-name": "lgx port name",
+ "lgx-port-shelf": "lgx port shelf",
+ "lgx-port-rack": "lgx port rack",
+ "lgx-device-name": "lgx device name"
+ }
+ },
+ "rx-direction": {
+ "port": {
+ "port-slot": "port slot",
+ "port-device-name": "device name",
+ "port-rack": "port rack",
+ "port-shelf": "port shelf",
+ "port-type": "port type",
+ "port-sub-slot": "port subslot",
+ "port-name": "port name"
+ },
+ "lgx": {
+ "lgx-port-name": "lgx port name",
+ "lgx-port-shelf": "lgx port shelf",
+ "lgx-port-rack": "lgx port rack",
+ "lgx-device-name": "lgx device name"
+ }
+ }
+ },
+ "common-id": "commond-id",
+ "operational-state": "inService",
+ "connection-type": "service",
+ "service-z-end": {
+ "service-format": "OC",
+ "node-id": "XPONDER-1-2",
+ "service-rate": 1,
+ "clli": "clli",
+ "tx-direction": {
+ "port": {
+ "port-slot": "port slot",
+ "port-device-name": "device name",
+ "port-rack": "port rack",
+ "port-shelf": "port shelf",
+ "port-type": "port type",
+ "port-sub-slot": "port subslot",
+ "port-name": "port name"
+ },
+ "lgx": {
+ "lgx-port-name": "lgx port name",
+ "lgx-port-shelf": "lgx port shelf",
+ "lgx-port-rack": "lgx port rack",
+ "lgx-device-name": "lgx device name"
+ }
+ },
+ "rx-direction": {
+ "port": {
+ "port-slot": "port slot",
+ "port-device-name": "device name",
+ "port-rack": "port rack",
+ "port-shelf": "port shelf",
+ "port-type": "port type",
+ "port-sub-slot": "port subslot",
+ "port-name": "port name"
+ },
+ "lgx": {
+ "lgx-port-name": "lgx port name",
+ "lgx-port-shelf": "lgx port shelf",
+ "lgx-port-rack": "lgx port rack",
+ "lgx-device-name": "lgx device name"
+ }
+ }
+ },
+ "message": "message",
+ "response-failed": ""
+ }
+}
--- /dev/null
+{"notification-service":{"service-name":"service1","service-a-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"common-id":"commond-id","operational-state":"inService","connection-type":"service","service-z-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"message":"message","response-failed":""}}
\ No newline at end of file
--- /dev/null
+#Kafka Producer/AdminClient properties
+bootstrap.servers=localhost:9092
+acks=all
+retries=3
+max.in.flight.requests.per.connection=1
+batch.size=16384
+linger.ms=1
+buffer.memory=33554432
--- /dev/null
+#Kafka Consumer properties
+bootstrap.servers=localhost:9092
+enable.auto.commit=true
+auto.commit.interval.ms=1000
+auto.offset.reset=earliest
<module>pce</module>
<module>servicehandler</module>
<module>tapi</module>
+ <module>nbinotifications</module>
<module>features</module>
<module>karaf</module>
</modules>