From 1c4b0ba3a2ca77a6acfcd41453b7c9c0f2f2480d Mon Sep 17 00:00:00 2001 From: Thierry Jiao Date: Mon, 14 Jun 2021 14:56:15 +0200 Subject: [PATCH] Add service listener to notify Kafka - Implement a new listener 'ServiceListener' that sends a notification to the topic 'alarmservice' of the Kafka broker when a service breakdowns or is restored - Add a new RPC API named 'GetNotificationsAlarmService' to retrieve notifications from the topic 'alarmservice' - Add a new subscriber 'SubscriberAlarm' dedicated to read alarm notifications from topics Kafka - Add a new publisher 'PublisherAlarm' dedicated to write alarm notifications from topics Kafka - Update nbi-notifications unit tests JIRA: TRNSPRTPCE-471 Signed-off-by: Thierry Jiao Change-Id: I4b42c8a3282a805791ae6d9d046031dd385faaae --- ...yang => nbi-notifications@2021-06-28.yang} | 79 +++++++++++++++- .../client/impl/DmaapClientProvider.java | 2 +- .../NbiNotificationsListenerImpl.java | 10 +- .../dmaap/client/resource/EventsApi.java | 2 +- .../PublishNotificationServiceModule.java | 6 +- .../PublishNotificationServiceSerializer.java | 2 +- .../config/ServiceAEndSerializer.java | 2 +- .../config/ServiceZEndSerializer.java | 2 +- .../NbiNotificationsListenerImplTest.java | 10 +- .../tpce/module/TransportPCEImpl.java | 14 ++- .../controllers/tpce/utils/TPCEUtils.java | 2 +- .../nbinotifications/consumer/Subscriber.java | 4 +- .../consumer/SubscriberAlarm.java | 73 +++++++++++++++ .../impl/NbiNotificationsImpl.java | 45 +++++++-- .../impl/NbiNotificationsProvider.java | 40 +++++--- .../NbiNotificationsListenerImpl.java | 43 ++++++--- .../nbinotifications/producer/Publisher.java | 2 +- .../producer/PublisherAlarm.java | 61 ++++++++++++ .../NotificationAlarmServiceDeserializer.java | 65 +++++++++++++ .../NotificationAlarmServiceSerializer.java | 55 +++++++++++ .../NotificationServiceDeserializer.java | 12 +-- .../NotificationServiceSerializer.java | 2 +- .../blueprint/nobinotifications-blueprint.xml | 5 + .../consumer/SubscriberTest.java | 2 +- .../impl/NbiNotificationsImplTest.java | 11 ++- .../impl/NbiNotificationsProviderTest.java | 4 +- .../NbiNotificationsListenerImplTest.java | 13 ++- .../producer/PublisherTest.java | 2 +- .../NotificationServiceDeserializerTest.java | 4 +- .../NotificationServiceSerializerTest.java | 6 +- .../utils/NotificationServiceDataUtils.java | 14 +-- .../src/test/resources/event.json | 10 +- .../src/test/resources/expected_event.json | 91 +++++++++++++++++- .../impl/ServicehandlerImpl.java | 8 +- .../impl/ServicehandlerProvider.java | 16 +++- .../listeners/PceListenerImpl.java | 8 +- .../listeners/RendererListenerImpl.java | 8 +- .../listeners/ServiceListener.java | 92 +++++++++++++++++++ .../blueprint/servicehandler-blueprint.xml | 6 ++ .../impl/ServicehandlerProviderTest.java | 9 +- 40 files changed, 733 insertions(+), 109 deletions(-) rename api/src/main/yang/{nbi-notifications@2020-11-30.yang => nbi-notifications@2021-06-28.yang} (58%) create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializer.java create mode 100644 nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializer.java create mode 100644 servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListener.java diff --git a/api/src/main/yang/nbi-notifications@2020-11-30.yang b/api/src/main/yang/nbi-notifications@2021-06-28.yang similarity index 58% rename from api/src/main/yang/nbi-notifications@2020-11-30.yang rename to api/src/main/yang/nbi-notifications@2021-06-28.yang index aa068cd53..d61c3993a 100644 --- a/api/src/main/yang/nbi-notifications@2020-11-30.yang +++ b/api/src/main/yang/nbi-notifications@2021-06-28.yang @@ -24,6 +24,11 @@ module nbi-notifications { "YANG definitions for using REST API in NBI notifications module. Copyright (c) 2020 ORANGE and others. All rights reserved."; + revision 2021-06-28 { + description + "Implement new models, RPC for service alarms"; + } + revision 2020-11-30 { description "Initial revision of NBI notifications"; @@ -72,14 +77,46 @@ module nbi-notifications { } } + grouping notification-alarm-service { + leaf message { + type string; + mandatory true; + description + "Message for the specified service"; + } + leaf service-name { + type string; + mandatory true; + description + "Identifier for the service to be created in the ROADM network, e.g., CLFI, CLCI, etc. + This is reported against the service, but may not get reflected in the service in the network."; + } + leaf connection-type { + type org-openroadm-common-service-types:connection-type; + mandatory true; + } + leaf operational-state { + type org-openroadm-common-state-types:state; + config false; + description + "Operational State: Actual state of service"; + } + } + container notification-service { description "Model used to send a notification from a service request"; uses notification-service; } + container notification-alarm-service { + description + "Model used to send a notification from the service listener"; + uses notification-alarm-service; + } + rpc get-notifications-service { - description "Get the notifications service send by ServiceHandler by filtering through connection type"; + description "Get the notifications service sent by ServiceHandler through filtering connection type"; input { leaf connection-type { type org-openroadm-common-service-types:connection-type; @@ -107,6 +144,35 @@ module nbi-notifications { } } + rpc get-notifications-alarm-service { + description "Get the notifications alarm service sent by ServiceListener through filtering connection type"; + input { + leaf connection-type { + type org-openroadm-common-service-types:connection-type; + mandatory true; + description + "Type connection of the service"; + } + leaf id-consumer { + type string; + mandatory true; + description + "Unique ID for the consumer"; + } + leaf group-id { + type string; + mandatory true; + description + "ID Group for the consumer"; + } + } + output { + list notification-alarm-service { + uses notification-alarm-service; + } + } + } + notification publish-notification-service { description "Publish the notifications service for topic"; leaf topic { @@ -117,4 +183,15 @@ module nbi-notifications { } uses notification-service; } + + notification publish-notification-alarm-service { + description "Publish the notifications service alarm for topic"; + leaf topic { + type string; + mandatory true; + description + "Topic where to send the notification service alarm"; + } + uses notification-alarm-service; + } } diff --git a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/impl/DmaapClientProvider.java b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/impl/DmaapClientProvider.java index adc4d062f..1da61d040 100644 --- a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/impl/DmaapClientProvider.java +++ b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/impl/DmaapClientProvider.java @@ -9,7 +9,7 @@ package org.opendaylight.transportpce.dmaap.client.impl; import org.opendaylight.mdsal.binding.api.NotificationService; import org.opendaylight.transportpce.dmaap.client.listener.NbiNotificationsListenerImpl; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImpl.java b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImpl.java index ed84dae96..d5003ce12 100644 --- a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImpl.java +++ b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImpl.java @@ -17,8 +17,9 @@ import org.glassfish.jersey.logging.LoggingFeature; import org.opendaylight.transportpce.dmaap.client.resource.EventsApi; import org.opendaylight.transportpce.dmaap.client.resource.config.JsonConfigurator; import org.opendaylight.transportpce.dmaap.client.resource.model.CreatedEvent; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,4 +53,9 @@ public class NbiNotificationsListenerImpl implements NbiNotificationsListener { } + @Override + public void onPublishNotificationAlarmService(PublishNotificationAlarmService notification) { + + } + } diff --git a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/EventsApi.java b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/EventsApi.java index e9d936cba..b1a5ae036 100644 --- a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/EventsApi.java +++ b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/EventsApi.java @@ -14,7 +14,7 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import org.opendaylight.transportpce.dmaap.client.resource.model.CreatedEvent; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; @Path("/events") public interface EventsApi { diff --git a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceModule.java b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceModule.java index 6b615576a..b33de94ea 100644 --- a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceModule.java +++ b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceModule.java @@ -13,9 +13,9 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev1 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirection; import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.lgx.Lgx; import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.Port; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEnd; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEnd; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEnd; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEnd; //This class is a temporary workaround while waiting jackson //support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852 diff --git a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceSerializer.java b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceSerializer.java index 0ed9b7b4d..0b282b4c9 100644 --- a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceSerializer.java +++ b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceSerializer.java @@ -11,7 +11,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; import java.io.IOException; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; // This class is a temporary workaround while waiting jackson // support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852 diff --git a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceAEndSerializer.java b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceAEndSerializer.java index 55bfaea7d..8e28cb46d 100644 --- a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceAEndSerializer.java +++ b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceAEndSerializer.java @@ -11,7 +11,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; import java.io.IOException; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEnd; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEnd; // This class is a temporary workaround while waiting jackson // support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852 diff --git a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceZEndSerializer.java b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceZEndSerializer.java index 1d65b1107..485d00ac5 100644 --- a/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceZEndSerializer.java +++ b/dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceZEndSerializer.java @@ -11,7 +11,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; import java.io.IOException; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEnd; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEnd; // This class is a temporary workaround while waiting jackson // support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852 diff --git a/dmaap-client/src/test/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImplTest.java b/dmaap-client/src/test/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImplTest.java index 710bf1b31..39bdc0c56 100644 --- a/dmaap-client/src/test/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImplTest.java +++ b/dmaap-client/src/test/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImplTest.java @@ -24,11 +24,11 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev1 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirectionBuilder; import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State; import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder; import org.opendaylight.yangtools.yang.common.Uint32; import org.slf4j.LoggerFactory; diff --git a/lighty/src/main/java/io/lighty/controllers/tpce/module/TransportPCEImpl.java b/lighty/src/main/java/io/lighty/controllers/tpce/module/TransportPCEImpl.java index d18791064..7ea47d278 100644 --- a/lighty/src/main/java/io/lighty/controllers/tpce/module/TransportPCEImpl.java +++ b/lighty/src/main/java/io/lighty/controllers/tpce/module/TransportPCEImpl.java @@ -75,6 +75,7 @@ import org.opendaylight.transportpce.servicehandler.impl.ServicehandlerProvider; import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl; import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl; import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl; +import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener; import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations; import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperationsImpl; import org.opendaylight.transportpce.tapi.R2RTapiLinkDiscovery; @@ -121,8 +122,9 @@ public class TransportPCEImpl extends AbstractLightyModule implements TransportP /** * List of publisher topics. */ - private final List publisherTopicList = - Arrays.asList("PceListener", "ServiceHandlerOperations", "ServiceHandler", "RendererListener"); + private final List publisherTopicList = Arrays.asList("PceListener", "ServiceHandlerOperations", + "ServiceHandler", "RendererListener"); + private final List publisherTopicAlarmList = Arrays.asList("ServiceListener"); public TransportPCEImpl(LightyServices lightyServices, boolean activateNbiNotification) { LOG.info("Initializing transaction providers ..."); @@ -192,6 +194,8 @@ public class TransportPCEImpl extends AbstractLightyModule implements TransportP lightyServices.getBindingNotificationPublishService(), networkModelService); PceListenerImpl pceListenerImpl = new PceListenerImpl(rendererServiceOperations, pathComputationService, lightyServices.getBindingNotificationPublishService(), serviceDataStoreOperations); + ServiceListener serviceListener = new ServiceListener(lightyServices.getBindingDataBroker(), + lightyServices.getBindingNotificationPublishService()); NetworkModelListenerImpl networkModelListenerImpl = new NetworkModelListenerImpl( lightyServices.getBindingNotificationPublishService(), serviceDataStoreOperations); ServicehandlerImpl servicehandler = new ServicehandlerImpl(lightyServices.getBindingDataBroker(), @@ -199,8 +203,8 @@ public class TransportPCEImpl extends AbstractLightyModule implements TransportP pceListenerImpl, rendererListenerImpl, networkModelListenerImpl, serviceDataStoreOperations, "N/A"); servicehandlerProvider = new ServicehandlerProvider(lightyServices.getBindingDataBroker(), lightyServices.getRpcProviderService(), lightyServices.getNotificationService(), - serviceDataStoreOperations, pceListenerImpl, rendererListenerImpl, networkModelListenerImpl, - servicehandler); + serviceDataStoreOperations, pceListenerImpl, serviceListener, rendererListenerImpl, + networkModelListenerImpl, servicehandler); LOG.info("Creating tapi beans ..."); R2RTapiLinkDiscovery tapilinkDiscoveryImpl = new R2RTapiLinkDiscovery(lightyServices.getBindingDataBroker(), @@ -225,7 +229,7 @@ public class TransportPCEImpl extends AbstractLightyModule implements TransportP if (activateNbiNotification) { LOG.info("Creating nbi-notifications beans ..."); nbiNotificationsProvider = new NbiNotificationsProvider( - publisherTopicList, null, null, lightyServices.getRpcProviderService(), + publisherTopicList, publisherTopicAlarmList, null, null, lightyServices.getRpcProviderService(), lightyServices.getNotificationService(), lightyServices.getAdapterContext().currentSerializer()); } } diff --git a/lighty/src/main/java/io/lighty/controllers/tpce/utils/TPCEUtils.java b/lighty/src/main/java/io/lighty/controllers/tpce/utils/TPCEUtils.java index 489bf8804..2b2575c30 100644 --- a/lighty/src/main/java/io/lighty/controllers/tpce/utils/TPCEUtils.java +++ b/lighty/src/main/java/io/lighty/controllers/tpce/utils/TPCEUtils.java @@ -348,7 +348,7 @@ public final class TPCEUtils { .getInstance(), org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.tapi.rev180928.$YangModuleInfoImpl .getInstance(), - org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.$YangModuleInfoImpl + org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.$YangModuleInfoImpl .getInstance()); private static final Set TPCE_YANG_MODEL = Stream.concat( diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java index cb044b126..735c6a8d2 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java @@ -23,7 +23,7 @@ import org.opendaylight.transportpce.common.converter.JsonStringConverter; import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants; import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer; import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,7 @@ public class Subscriber { public Subscriber(String id, String groupId, String suscriberServer, JsonStringConverter deserializer) { + .nbi.notifications.rev210628.NotificationService> deserializer) { Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties"); propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id); diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java new file mode 100644 index 000000000..f3bf9dfd0 --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.consumer; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants; +import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer; +import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SubscriberAlarm { + private static final Logger LOG = LoggerFactory.getLogger(SubscriberAlarm.class); + + private final Consumer consumer; + + public SubscriberAlarm(String id, String groupId, String subscriberServer, + JsonStringConverter deserializer) { + Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties"); + propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id); + propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationAlarmServiceDeserializer.class); + propsConsumer.put(ConfigConstants.CONVERTER , deserializer); + if (subscriberServer != null && !subscriberServer.isBlank()) { + propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer); + } + LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer); + consumer = new KafkaConsumer<>(propsConsumer); + } + + public List subscribeAlarm(String topicName) { + LOG.info("Subscribe request to topic '{}' ", topicName); + consumer.subscribe(Collections.singleton(topicName)); + final ConsumerRecords consumerRecords = consumer + .poll(Duration.ofMillis(1000)); + List notificationAlarmServiceList = new ArrayList<>(); + YangInstanceIdentifier.of(NotificationAlarmService.QNAME); + for (ConsumerRecord record : consumerRecords) { + if (record.value() != null) { + notificationAlarmServiceList.add(record.value()); + } + } + LOG.info("Getting records '{}' ", notificationAlarmServiceList); + consumer.unsubscribe(); + consumer.close(); + return notificationAlarmServiceList; + } + + @VisibleForTesting public SubscriberAlarm(Consumer consumer) { + this.consumer = consumer; + } +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java index 073ce49f8..d7201d87a 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java @@ -11,11 +11,16 @@ import com.google.common.util.concurrent.ListenableFuture; import java.util.List; import org.opendaylight.transportpce.common.converter.JsonStringConverter; import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInput; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutputBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; +import org.opendaylight.transportpce.nbinotifications.consumer.SubscriberAlarm; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceInput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutputBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceInput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutputBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; @@ -24,12 +29,17 @@ import org.slf4j.LoggerFactory; public class NbiNotificationsImpl implements NbiNotificationsService { private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class); private final JsonStringConverter converter; + .nbi.notifications.rev210628.NotificationService> converterService; + private final JsonStringConverter converterAlarmService; private final String server; public NbiNotificationsImpl(JsonStringConverter converter, String server) { - this.converter = converter; + .nbi.notifications.rev210628.NotificationService> converterService, + JsonStringConverter converterAlarmService, String server) { + this.converterService = converterService; + this.converterAlarmService = converterAlarmService; this.server = server; } @@ -41,11 +51,28 @@ public class NbiNotificationsImpl implements NbiNotificationsService { LOG.warn("Missing mandatory params for input {}", input); return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture(); } - Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converter); + Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converterService); List notificationServiceList = subscriber .subscribeService(input.getConnectionType().getName()); GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder() .setNotificationService(notificationServiceList); return RpcResultBuilder.success(output.build()).buildFuture(); } + + @Override + public ListenableFuture> getNotificationsAlarmService( + GetNotificationsAlarmServiceInput input) { + LOG.info("RPC getNotificationsAlarmService received"); + if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) { + LOG.warn("Missing mandatory params for input {}", input); + return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture(); + } + SubscriberAlarm subscriberAlarm = new SubscriberAlarm(input.getIdConsumer(), input.getGroupId(), server, + converterAlarmService); + List notificationAlarmServiceList = subscriberAlarm + .subscribeAlarm("alarm" + input.getConnectionType().getName()); + GetNotificationsAlarmServiceOutputBuilder output = new GetNotificationsAlarmServiceOutputBuilder() + .setNotificationAlarmService(notificationAlarmServiceList); + return RpcResultBuilder.success(output.build()).buildFuture(); + } } diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java index 5f4103446..327841653 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java @@ -16,8 +16,9 @@ import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices; import org.opendaylight.transportpce.common.converter.JsonStringConverter; import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl; import org.opendaylight.transportpce.nbinotifications.producer.Publisher; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService; +import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.ObjectRegistration; import org.slf4j.Logger; @@ -26,29 +27,37 @@ import org.slf4j.LoggerFactory; public class NbiNotificationsProvider { private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class); - private static Map publishersMap = new HashMap<>(); + private static Map publishersServiceMap = new HashMap<>(); + private static Map publishersAlarmMap = new HashMap<>(); private final RpcProviderService rpcService; private ObjectRegistration rpcRegistration; private ListenerRegistration listenerRegistration; private NotificationService notificationService; private final JsonStringConverter converter; - private final String suscriberServer; + .nbi.notifications.rev210628.NotificationService> converterService; + private final JsonStringConverter converterAlarmService; + private final String subscriberServer; - public NbiNotificationsProvider(List topics, - String suscriberServer, String publisherServer, + public NbiNotificationsProvider(List topicsService, List topicsAlarm, + String subscriberServer, String publisherServer, RpcProviderService rpcProviderService, NotificationService notificationService, BindingDOMCodecServices bindingDOMCodecServices) { this.rpcService = rpcProviderService; this.notificationService = notificationService; - converter = new JsonStringConverter<>(bindingDOMCodecServices); - for (String topic: topics) { + converterService = new JsonStringConverter<>(bindingDOMCodecServices); + for (String topic: topicsService) { + LOG.info("Creating publisher for topic {}", topic); + publishersServiceMap.put(topic, new Publisher(topic, publisherServer, converterService)); + } + converterAlarmService = new JsonStringConverter<>(bindingDOMCodecServices); + for (String topic: topicsAlarm) { LOG.info("Creating publisher for topic {}", topic); - publishersMap.put(topic, new Publisher(topic, publisherServer, converter)); + publishersAlarmMap.put(topic, new PublisherAlarm(topic, publisherServer, converterAlarmService)); } - this.suscriberServer = suscriberServer; + this.subscriberServer = subscriberServer; } /** @@ -57,18 +66,21 @@ public class NbiNotificationsProvider { public void init() { LOG.info("NbiNotificationsProvider Session Initiated"); rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class, - new NbiNotificationsImpl(converter, suscriberServer)); + new NbiNotificationsImpl(converterService, converterAlarmService, subscriberServer)); listenerRegistration = notificationService.registerNotificationListener( - new NbiNotificationsListenerImpl(publishersMap)); + new NbiNotificationsListenerImpl(publishersServiceMap, publishersAlarmMap)); } /** * Method called when the blueprint container is destroyed. */ public void close() { - for (Publisher publisher : publishersMap.values()) { + for (Publisher publisher : publishersServiceMap.values()) { publisher.close(); } + for (PublisherAlarm publisherAlarm : publishersAlarmMap.values()) { + publisherAlarm.close(); + } rpcRegistration.close(); listenerRegistration.close(); LOG.info("NbiNotificationsProvider Closed"); diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java index 0bcdf6bd2..2e8427f64 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java @@ -7,39 +7,60 @@ */ package org.opendaylight.transportpce.nbinotifications.listener; -import java.util.HashMap; import java.util.Map; import org.opendaylight.transportpce.nbinotifications.producer.Publisher; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; +import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NbiNotificationsListenerImpl implements NbiNotificationsListener { private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsListenerImpl.class); - private Map publishersMap = new HashMap<>(); + private Map publishersServiceMap; + private Map publishersAlarmMap; - public NbiNotificationsListenerImpl(Map publishersMap) { - this.publishersMap = publishersMap; + public NbiNotificationsListenerImpl(Map publishersServiceMap, + Map publishersAlarmMap) { + this.publishersServiceMap = publishersServiceMap; + this.publishersAlarmMap = publishersAlarmMap; } @Override public void onPublishNotificationService(PublishNotificationService notification) { LOG.info("Receiving request for publishing notification service"); String topic = notification.getTopic(); - if (!publishersMap.containsKey(topic)) { + if (!publishersServiceMap.containsKey(topic)) { LOG.error("Unknown topic {}", topic); return; } - Publisher publisher = publishersMap.get(topic); + Publisher publisher = publishersServiceMap.get(topic); publisher.sendEvent(new NotificationServiceBuilder().setCommonId(notification.getCommonId()) .setConnectionType(notification.getConnectionType()).setMessage(notification.getMessage()) .setOperationalState(notification.getOperationalState()) .setResponseFailed(notification.getResponseFailed()) - .setServiceAEnd(notification.getServiceAEnd()).setServiceName(notification.getServiceName()) + .setServiceAEnd(notification.getServiceAEnd()) + .setServiceName(notification.getServiceName()) .setServiceZEnd(notification.getServiceZEnd()).build()); - } + @Override + public void onPublishNotificationAlarmService(PublishNotificationAlarmService notification) { + LOG.info("Receiving request for publishing notification alarm service"); + String topic = notification.getTopic(); + if (!publishersAlarmMap.containsKey(topic)) { + LOG.error("Unknown topic {}", topic); + return; + } + PublisherAlarm publisherAlarm = publishersAlarmMap.get(topic); + publisherAlarm.sendEvent(new NotificationAlarmServiceBuilder().setConnectionType(notification + .getConnectionType()) + .setMessage(notification.getMessage()) + .setOperationalState(notification.getOperationalState()) + .setServiceName(notification.getServiceName()) + .build()); + } } diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java index 21295a9b6..664a7c9a0 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java @@ -18,7 +18,7 @@ import org.opendaylight.transportpce.common.converter.JsonStringConverter; import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants; import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer; import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java new file mode 100644 index 000000000..b275548ca --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.producer; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Properties; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants; +import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer; +import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PublisherAlarm { + private static final Logger LOG = LoggerFactory.getLogger(PublisherAlarm.class); + + private final String id; + private final Producer producer; + + public PublisherAlarm(String id, String publisherServer, JsonStringConverter serializer) { + Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties"); + properties.put(ProducerConfig.CLIENT_ID_CONFIG, id); + if (publisherServer != null && !publisherServer.isBlank()) { + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer); + } + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationAlarmServiceSerializer.class); + properties.put(ConfigConstants.CONVERTER , serializer); + LOG.info("Creationg publisher for id {} with properties {}", id, properties); + producer = new KafkaProducer<>(properties); + this.id = id; + } + + @VisibleForTesting + PublisherAlarm(String id, Producer producer) { + this.producer = producer; + this.id = id; + } + + public void close() { + producer.close(); + } + + public void sendEvent(NotificationAlarmService notificationAlarmService) { + LOG.info("SendEvent request to topic '{}' ", notificationAlarmService.getConnectionType().getName()); + producer.send(new ProducerRecord<>("alarm" + notificationAlarmService.getConnectionType().getName(), + id, notificationAlarmService)); + producer.flush(); + } +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializer.java new file mode 100644 index 000000000..4612d84bc --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializer.java @@ -0,0 +1,65 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.serialization; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmServiceBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NotificationAlarmServiceDeserializer implements Deserializer { + private static final Logger LOG = LoggerFactory.getLogger(NotificationAlarmServiceDeserializer.class); + private JsonStringConverter converter; + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + LOG.info("Deserializer configuration {}", configs); + if (configs.containsKey(ConfigConstants.CONVERTER) + && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter) { + converter = (JsonStringConverter) configs + .get(ConfigConstants.CONVERTER); + } + } + + @Override + public NotificationAlarmService deserialize(String topic, byte[] data) { + if (converter == null) { + throw new IllegalArgumentException( + "Converter should be configured through configure method of deserializer"); + } + String value = new String(data, StandardCharsets.UTF_8); + // The message published is + // org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService + // we have to map it to + // org.opendaylight.yang.gen + // .v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService + org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService mappedString = converter + .createDataObjectFromJsonString(YangInstanceIdentifier.of( + org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService.QNAME), + value, + JSONCodecFactorySupplier.RFC7951); + if (mappedString != null) { + LOG.info("Reading event {}", mappedString); + return new NotificationAlarmServiceBuilder().setConnectionType(mappedString.getConnectionType()) + .setMessage(mappedString.getMessage()) + .setOperationalState(mappedString.getOperationalState()) + .setServiceName(mappedString.getServiceName()) + .build(); + } + return null; + } +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializer.java new file mode 100644 index 000000000..c2ce676ee --- /dev/null +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializer.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2021 Orange, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.nbinotifications.serialization; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; +import org.opendaylight.transportpce.common.converter.JsonStringConverter; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class NotificationAlarmServiceSerializer implements Serializer { + private static final Logger LOG = LoggerFactory.getLogger(NotificationAlarmServiceSerializer.class); + private JsonStringConverter converter; + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + LOG.info("Deserializer configuration {}", configs); + if (configs.containsKey(ConfigConstants.CONVERTER) + && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter) { + converter = (JsonStringConverter) configs.get(ConfigConstants.CONVERTER); + } + } + + @Override + public byte[] serialize(String topic, NotificationAlarmService data) { + if (converter == null) { + throw new IllegalArgumentException( + "Converter should be" + "configured through configure method of serializer"); + } + if (data == null) { + return new byte[0]; + } + try { + InstanceIdentifier iid = + InstanceIdentifier.builder(NotificationAlarmService.class).build(); + String serialized = converter.createJsonStringFromDataObject(iid, data, JSONCodecFactorySupplier.RFC7951); + LOG.info("Serialized event {}", serialized); + return serialized.getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + return new byte[0]; + } + } +} diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java index 95f10fb63..81f02ac94 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java @@ -11,8 +11,8 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.opendaylight.transportpce.common.converter.JsonStringConverter; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; import org.slf4j.Logger; @@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory; public class NotificationServiceDeserializer implements Deserializer { private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceDeserializer.class); private JsonStringConverter converter; + .nbi.notifications.rev210628.NotificationService> converter; @SuppressWarnings("unchecked") @Override @@ -30,7 +30,7 @@ public class NotificationServiceDeserializer implements Deserializer) { converter = (JsonStringConverter) configs + .nbi.notifications.rev210628.NotificationService>) configs .get(ConfigConstants.CONVERTER); } } @@ -47,9 +47,9 @@ public class NotificationServiceDeserializer implements DeserializerRendererListener + + + ServiceListener + + diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java index de58a498a..5427020e1 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java @@ -22,7 +22,7 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils; import org.opendaylight.transportpce.test.AbstractTest; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService; public class SubscriberTest extends AbstractTest { private static final String TOPIC = "topic"; diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java index da825509a..9de03934c 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java @@ -16,8 +16,8 @@ import org.junit.Test; import org.opendaylight.transportpce.common.converter.JsonStringConverter; import org.opendaylight.transportpce.test.AbstractTest; import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInputBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceInputBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutput; import org.opendaylight.yangtools.yang.common.RpcResult; public class NbiNotificationsImplTest extends AbstractTest { @@ -26,9 +26,12 @@ public class NbiNotificationsImplTest extends AbstractTest { @Before public void setUp() { JsonStringConverter converter = new JsonStringConverter<>( + .nbi.notifications.rev210628.NotificationService> converter = new JsonStringConverter<>( getDataStoreContextUtil().getBindingDOMCodecServices()); - nbiNotificationsImpl = new NbiNotificationsImpl(converter, "localhost:8080"); + JsonStringConverter converterAlarm = new JsonStringConverter<>( + getDataStoreContextUtil().getBindingDOMCodecServices()); + nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm,"localhost:8080"); } public void getNotificationsServiceEmptyDataTest() throws InterruptedException, ExecutionException { diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java index 178fc4243..2b917aab0 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java @@ -38,8 +38,8 @@ public class NbiNotificationsProviderTest extends AbstractTest { @Test public void initTest() { NbiNotificationsProvider provider = new NbiNotificationsProvider( - Arrays.asList("topic1", "topic2"), "localhost:8080", "localhost:8080", - rpcProviderRegistry, notificationService, + Arrays.asList("topic1", "topic2"), Arrays.asList("topic1", "topic2"), "localhost:8080", + "localhost:8080", rpcProviderRegistry, notificationService, getDataStoreContextUtil().getBindingDOMCodecServices()); provider.init(); verify(rpcProviderRegistry, times(1)) diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java index cb8faf242..899baed01 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java @@ -17,15 +17,18 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.transportpce.nbinotifications.producer.Publisher; +import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm; import org.opendaylight.transportpce.test.AbstractTest; import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType; import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder; public class NbiNotificationsListenerImplTest extends AbstractTest { @Mock private Publisher publisher; + @Mock + private PublisherAlarm publisherAlarm; @Before public void setUp() { @@ -34,7 +37,8 @@ public class NbiNotificationsListenerImplTest extends AbstractTest { @Test public void onPublishNotificationServiceTest() { - NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher)); + NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher), + Map.of("test", publisherAlarm)); PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("test") .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted") .setOperationalState(State.OutOfService).setServiceName("service name").build(); @@ -44,7 +48,8 @@ public class NbiNotificationsListenerImplTest extends AbstractTest { @Test public void onPublishNotificationServiceWrongTopicTest() { - NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher)); + NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher), + Map.of("test", publisherAlarm)); PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("wrongtopic") .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted") .setOperationalState(State.OutOfService).setServiceName("service name").build(); diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java index b3a5b2edb..f738fd71d 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java @@ -21,7 +21,7 @@ import org.opendaylight.transportpce.common.converter.JsonStringConverter; import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants; import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer; import org.opendaylight.transportpce.test.AbstractTest; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java index 928c2471a..430e7d8e2 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java @@ -16,13 +16,13 @@ import java.util.Map; import org.junit.Test; import org.opendaylight.transportpce.common.converter.JsonStringConverter; import org.opendaylight.transportpce.test.AbstractTest; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService; public class NotificationServiceDeserializerTest extends AbstractTest { @Test public void deserializeTest() throws IOException { - JsonStringConverter converter = + JsonStringConverter converter = new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices()); NotificationServiceDeserializer deserializer = new NotificationServiceDeserializer(); Map configs = Map.of(ConfigConstants.CONVERTER, converter); diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java index 3fe658b6b..ca11dda9c 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java @@ -10,6 +10,8 @@ package org.opendaylight.transportpce.nbinotifications.serialization; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -18,7 +20,7 @@ import java.util.Map; import org.junit.Test; import org.opendaylight.transportpce.common.converter.JsonStringConverter; import org.opendaylight.transportpce.test.AbstractTest; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; @@ -39,6 +41,8 @@ public class NotificationServiceSerializerTest extends AbstractTest { serializer.close(); assertNotNull("Serialized data should not be null", data); String expectedJson = Files.readString(Paths.get("src/test/resources/expected_event.json")); + // Minify the json string + expectedJson = new ObjectMapper().readValue(expectedJson, JsonNode.class).toString(); assertEquals("The event should be equals", expectedJson, new String(data, StandardCharsets.UTF_8)); } } diff --git a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java index 44b695888..0921e99c6 100644 --- a/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java +++ b/nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java @@ -15,10 +15,10 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev1 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.PortBuilder; import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State; import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder; import org.opendaylight.yangtools.yang.common.Uint32; public final class NotificationServiceDataUtils { @@ -41,11 +41,11 @@ public final class NotificationServiceDataUtils { } public static org.opendaylight.yang.gen.v1 - .nbi.notifications.rev201130.get.notifications.service.output.NotificationService buildReceivedEvent() { + .nbi.notifications.rev210628.get.notifications.service.output.NotificationService buildReceivedEvent() { org.opendaylight.yang.gen.v1 - .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder + .nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder notificationServiceBuilder = new org.opendaylight.yang.gen.v1 - .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder() + .nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder() .setMessage("message") .setServiceName("service1") .setOperationalState(State.InService) diff --git a/nbinotifications/src/test/resources/event.json b/nbinotifications/src/test/resources/event.json index 63c7bec73..9e85e38f2 100644 --- a/nbinotifications/src/test/resources/event.json +++ b/nbinotifications/src/test/resources/event.json @@ -1,7 +1,6 @@ { "notification-service": { - "service-name": "service1", - "service-a-end": { + "service-z-end": { "service-format": "OC", "node-id": "XPONDER-1-2", "service-rate": 1, @@ -41,10 +40,10 @@ } } }, - "common-id": "commond-id", - "operational-state": "inService", "connection-type": "service", - "service-z-end": { + "operational-state": "inService", + "common-id": "commond-id", + "service-a-end": { "service-format": "OC", "node-id": "XPONDER-1-2", "service-rate": 1, @@ -84,6 +83,7 @@ } } }, + "service-name": "service1", "message": "message", "response-failed": "" } diff --git a/nbinotifications/src/test/resources/expected_event.json b/nbinotifications/src/test/resources/expected_event.json index 2b8924df3..499c7e47a 100644 --- a/nbinotifications/src/test/resources/expected_event.json +++ b/nbinotifications/src/test/resources/expected_event.json @@ -1 +1,90 @@ -{"notification-service":{"service-name":"service1","service-a-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"common-id":"commond-id","operational-state":"inService","connection-type":"service","service-z-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"message":"message","response-failed":""}} \ No newline at end of file +{ + "notification-service": { + "service-z-end": { + "clli": "clli", + "service-rate": 1, + "node-id": "XPONDER-1-2", + "rx-direction": { + "lgx": { + "lgx-port-name": "lgx port name", + "lgx-port-shelf": "lgx port shelf", + "lgx-port-rack": "lgx port rack", + "lgx-device-name": "lgx device name" + }, + "port": { + "port-shelf": "port shelf", + "port-rack": "port rack", + "port-device-name": "device name", + "port-name": "port name", + "port-slot": "port slot", + "port-type": "port type", + "port-sub-slot": "port subslot" + } + }, + "service-format": "OC", + "tx-direction": { + "lgx": { + "lgx-port-name": "lgx port name", + "lgx-port-shelf": "lgx port shelf", + "lgx-port-rack": "lgx port rack", + "lgx-device-name": "lgx device name" + }, + "port": { + "port-shelf": "port shelf", + "port-rack": "port rack", + "port-device-name": "device name", + "port-name": "port name", + "port-slot": "port slot", + "port-type": "port type", + "port-sub-slot": "port subslot" + } + } + }, + "connection-type": "service", + "operational-state": "inService", + "common-id": "commond-id", + "service-a-end": { + "clli": "clli", + "service-rate": 1, + "node-id": "XPONDER-1-2", + "rx-direction": { + "lgx": { + "lgx-port-name": "lgx port name", + "lgx-port-shelf": "lgx port shelf", + "lgx-port-rack": "lgx port rack", + "lgx-device-name": "lgx device name" + }, + "port": { + "port-shelf": "port shelf", + "port-rack": "port rack", + "port-device-name": "device name", + "port-name": "port name", + "port-slot": "port slot", + "port-type": "port type", + "port-sub-slot": "port subslot" + } + }, + "service-format": "OC", + "tx-direction": { + "lgx": { + "lgx-port-name": "lgx port name", + "lgx-port-shelf": "lgx port shelf", + "lgx-port-rack": "lgx port rack", + "lgx-device-name": "lgx device name" + }, + "port": { + "port-shelf": "port shelf", + "port-rack": "port rack", + "port-device-name": "device name", + "port-name": "port name", + "port-slot": "port slot", + "port-type": "port type", + "port-sub-slot": "port subslot" + } + } + }, + "service-name": "service1", + "response-failed": "", + "message": "message" + } +} \ No newline at end of file diff --git a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerImpl.java b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerImpl.java index 3774f8139..e176348fe 100644 --- a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerImpl.java +++ b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerImpl.java @@ -80,10 +80,10 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.TempSer import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.delete.input.ServiceDeleteReqInfo.TailRetention; import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.delete.input.ServiceDeleteReqInfoBuilder; import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; diff --git a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProvider.java b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProvider.java index 0fec8b9e7..8d29234c2 100644 --- a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProvider.java +++ b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProvider.java @@ -9,18 +9,24 @@ package org.opendaylight.transportpce.servicehandler.impl; import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.DataTreeIdentifier; import org.opendaylight.mdsal.binding.api.NotificationService; import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl; import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl; import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl; +import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener; import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations; import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.networkmodel.rev201116.TransportpceNetworkmodelListener; import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.pce.rev210701.TransportpcePceListener; import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.renderer.rev210618.TransportpceRendererListener; import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.OrgOpenroadmServiceService; +import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.ServiceList; +import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,23 +39,27 @@ import org.slf4j.LoggerFactory; public class ServicehandlerProvider { private static final Logger LOG = LoggerFactory.getLogger(ServicehandlerProvider.class); + private static final InstanceIdentifier SERVICE = InstanceIdentifier.builder(ServiceList.class) + .child(Services.class).build(); private final DataBroker dataBroker; private final RpcProviderService rpcService; private final NotificationService notificationService; private ListenerRegistration pcelistenerRegistration; + private ListenerRegistration serviceDataTreeChangeListenerRegistration; private ListenerRegistration rendererlistenerRegistration; private ListenerRegistration networkmodellistenerRegistration; private ObjectRegistration rpcRegistration; private ServiceDataStoreOperations serviceDataStoreOperations; private PceListenerImpl pceListenerImpl; + private ServiceListener serviceListener; private RendererListenerImpl rendererListenerImpl; private NetworkModelListenerImpl networkModelListenerImpl; private ServicehandlerImpl servicehandler; public ServicehandlerProvider(final DataBroker dataBroker, RpcProviderService rpcProviderService, NotificationService notificationService, ServiceDataStoreOperations serviceDataStoreOperations, - PceListenerImpl pceListenerImpl, RendererListenerImpl rendererListenerImpl, + PceListenerImpl pceListenerImpl, ServiceListener serviceListener, RendererListenerImpl rendererListenerImpl, NetworkModelListenerImpl networkModelListenerImpl, ServicehandlerImpl servicehandler) { this.dataBroker = dataBroker; this.rpcService = rpcProviderService; @@ -57,6 +67,7 @@ public class ServicehandlerProvider { this.serviceDataStoreOperations = serviceDataStoreOperations; this.serviceDataStoreOperations.initialize(); this.pceListenerImpl = pceListenerImpl; + this.serviceListener = serviceListener; this.rendererListenerImpl = rendererListenerImpl; this.networkModelListenerImpl = networkModelListenerImpl; this.servicehandler = servicehandler; @@ -68,6 +79,8 @@ public class ServicehandlerProvider { public void init() { LOG.info("ServicehandlerProvider Session Initiated"); pcelistenerRegistration = notificationService.registerNotificationListener(pceListenerImpl); + serviceDataTreeChangeListenerRegistration = dataBroker.registerDataTreeChangeListener( + DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, SERVICE), serviceListener); rendererlistenerRegistration = notificationService.registerNotificationListener(rendererListenerImpl); networkmodellistenerRegistration = notificationService.registerNotificationListener(networkModelListenerImpl); rpcRegistration = rpcService.registerRpcImplementation(OrgOpenroadmServiceService.class, servicehandler); @@ -79,6 +92,7 @@ public class ServicehandlerProvider { public void close() { LOG.info("ServicehandlerProvider Closed"); pcelistenerRegistration.close(); + serviceDataTreeChangeListenerRegistration.close(); rendererlistenerRegistration.close(); networkmodellistenerRegistration.close(); rpcRegistration.close(); diff --git a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/PceListenerImpl.java b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/PceListenerImpl.java index 8782bb79c..305c4156e 100644 --- a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/PceListenerImpl.java +++ b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/PceListenerImpl.java @@ -28,10 +28,10 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.RpcStatusEx; import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.response.parameters.sp.ResponseParameters; import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.response.parameters.sp.ResponseParametersBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/RendererListenerImpl.java b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/RendererListenerImpl.java index 82eb67760..b8519b6ff 100644 --- a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/RendererListenerImpl.java +++ b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/RendererListenerImpl.java @@ -26,10 +26,10 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev191 import org.opendaylight.yang.gen.v1.http.org.openroadm.equipment.states.types.rev191129.AdminStates; import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services; import org.opendaylight.yang.gen.v1.http.transportpce.topology.rev210511.OtnLinkType; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder; -import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder; import org.opendaylight.yangtools.yang.common.Uint32; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListener.java b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListener.java new file mode 100644 index 000000000..ee64141f0 --- /dev/null +++ b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListener.java @@ -0,0 +1,92 @@ +/* + * Copyright © 2021 Orange and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.transportpce.servicehandler.listeners; + +import java.util.Collection; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.DataObjectModification; +import org.opendaylight.mdsal.binding.api.DataTreeChangeListener; +import org.opendaylight.mdsal.binding.api.DataTreeModification; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; +import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State; +import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService; +import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmServiceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceListener implements DataTreeChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceListener.class); + private static final String TOPIC = "ServiceListener"; + private final DataBroker dataBroker; + private NotificationPublishService notificationPublishService; + + public ServiceListener(final DataBroker dataBroker, NotificationPublishService notificationPublishService) { + this.dataBroker = dataBroker; + this.notificationPublishService = notificationPublishService; + } + + public void onDataTreeChanged(Collection> changes) { + LOG.info("onDataTreeChanged - {}", this.getClass().getSimpleName()); + for (DataTreeModification change : changes) { + DataObjectModification rootService = change.getRootNode(); + if (rootService.getDataBefore() == null) { + continue; + } + String serviceName = rootService.getDataBefore().key().getServiceName(); + switch (rootService.getModificationType()) { + case DELETE: + LOG.info("Service {} correctly deleted from controller", serviceName); + break; + case WRITE: + Services input = rootService.getDataAfter(); + if (rootService.getDataBefore().getOperationalState() == State.InService + && rootService.getDataAfter().getOperationalState() == State.OutOfService) { + LOG.info("Service {} is becoming outOfService", serviceName); + sendNbiNotification(new PublishNotificationAlarmServiceBuilder() + .setServiceName(input.getServiceName()) + .setConnectionType(input.getConnectionType()) + .setMessage("The service is now outOfService") + .setOperationalState(State.OutOfService) + .setTopic(TOPIC) + .build()); + } + else if (rootService.getDataBefore().getOperationalState() == State.OutOfService + && rootService.getDataAfter().getOperationalState() == State.InService) { + LOG.info("Service {} is becoming InService", serviceName); + sendNbiNotification(new PublishNotificationAlarmServiceBuilder() + .setServiceName(input.getServiceName()) + .setConnectionType(input.getConnectionType()) + .setMessage("The service is now inService") + .setOperationalState(State.InService) + .setTopic(TOPIC) + .build()); + } + break; + default: + LOG.debug("Unknown modification type {}", rootService.getModificationType().name()); + break; + } + } + } + + /** + * Send notification to NBI notification in order to publish message. + * + * @param service PublishNotificationAlarmService + */ + private void sendNbiNotification(PublishNotificationAlarmService service) { + try { + notificationPublishService.putNotification(service); + } catch (InterruptedException e) { + LOG.warn("Cannot send notification to nbi", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/servicehandler/src/main/resources/OSGI-INF/blueprint/servicehandler-blueprint.xml b/servicehandler/src/main/resources/OSGI-INF/blueprint/servicehandler-blueprint.xml index 62a7e6400..8e2ba983b 100644 --- a/servicehandler/src/main/resources/OSGI-INF/blueprint/servicehandler-blueprint.xml +++ b/servicehandler/src/main/resources/OSGI-INF/blueprint/servicehandler-blueprint.xml @@ -43,6 +43,11 @@ Author: Martial Coulibaly on behalf of Orange + + + + + @@ -74,6 +79,7 @@ Author: Martial Coulibaly on behalf of Orange + diff --git a/servicehandler/src/test/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProviderTest.java b/servicehandler/src/test/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProviderTest.java index e458df832..85b51d7cb 100644 --- a/servicehandler/src/test/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProviderTest.java +++ b/servicehandler/src/test/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProviderTest.java @@ -20,6 +20,7 @@ import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl; import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl; import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl; +import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener; import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations; import org.opendaylight.transportpce.test.AbstractTest; @@ -34,6 +35,9 @@ public class ServicehandlerProviderTest extends AbstractTest { @Mock PceListenerImpl pceListenerImpl; + @Mock + ServiceListener serviceListener; + @Mock RendererListenerImpl rendererListenerImpl; @@ -43,6 +47,7 @@ public class ServicehandlerProviderTest extends AbstractTest { @Mock ServicehandlerImpl servicehandler; + private AutoCloseable closeable; @Before @@ -54,8 +59,8 @@ public class ServicehandlerProviderTest extends AbstractTest { public void testInitRegisterServiceHandlerToRpcRegistry() { ServicehandlerProvider provider = new ServicehandlerProvider( getDataBroker(), rpcProviderRegistry, - getNotificationService() , serviceDataStoreOperations, pceListenerImpl, rendererListenerImpl, - networkModelListenerImpl, servicehandler); + getNotificationService() , serviceDataStoreOperations, pceListenerImpl, serviceListener, + rendererListenerImpl, networkModelListenerImpl, servicehandler); provider.init(); -- 2.36.6