Add service listener to notify Kafka 31/96531/9
authorThierry Jiao <thierry.jiao@orange.com>
Mon, 14 Jun 2021 12:56:15 +0000 (14:56 +0200)
committerThierry Jiao <thierry.jiao@orange.com>
Tue, 3 Aug 2021 14:51:43 +0000 (16:51 +0200)
  - Implement a new listener 'ServiceListener' that sends a
    notification to the topic 'alarmservice' of the Kafka broker
    when a service breakdowns or is restored
  - Add a new RPC API named 'GetNotificationsAlarmService' to retrieve
    notifications from the topic 'alarmservice'
  - Add a new subscriber 'SubscriberAlarm' dedicated to read
    alarm notifications from topics Kafka
  - Add a new publisher 'PublisherAlarm' dedicated to write alarm
    notifications from topics Kafka
  - Update nbi-notifications unit tests

JIRA: TRNSPRTPCE-471
Signed-off-by: Thierry Jiao <thierry.jiao@orange.com>
Change-Id: I4b42c8a3282a805791ae6d9d046031dd385faaae

40 files changed:
api/src/main/yang/nbi-notifications@2021-06-28.yang [moved from api/src/main/yang/nbi-notifications@2020-11-30.yang with 58% similarity]
dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/impl/DmaapClientProvider.java
dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImpl.java
dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/EventsApi.java
dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceModule.java
dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/PublishNotificationServiceSerializer.java
dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceAEndSerializer.java
dmaap-client/src/main/java/org/opendaylight/transportpce/dmaap/client/resource/config/ServiceZEndSerializer.java
dmaap-client/src/test/java/org/opendaylight/transportpce/dmaap/client/listener/NbiNotificationsListenerImplTest.java
lighty/src/main/java/io/lighty/controllers/tpce/module/TransportPCEImpl.java
lighty/src/main/java/io/lighty/controllers/tpce/utils/TPCEUtils.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImpl.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProvider.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImpl.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/Publisher.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializer.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializer.java [new file with mode: 0644]
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializer.java
nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializer.java
nbinotifications/src/main/resources/OSGI-INF/blueprint/nobinotifications-blueprint.xml
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsImplTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/impl/NbiNotificationsProviderTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/listener/NbiNotificationsListenerImplTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceDeserializerTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationServiceSerializerTest.java
nbinotifications/src/test/java/org/opendaylight/transportpce/nbinotifications/utils/NotificationServiceDataUtils.java
nbinotifications/src/test/resources/event.json
nbinotifications/src/test/resources/expected_event.json
servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerImpl.java
servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProvider.java
servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/PceListenerImpl.java
servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/RendererListenerImpl.java
servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListener.java [new file with mode: 0644]
servicehandler/src/main/resources/OSGI-INF/blueprint/servicehandler-blueprint.xml
servicehandler/src/test/java/org/opendaylight/transportpce/servicehandler/impl/ServicehandlerProviderTest.java

similarity index 58%
rename from api/src/main/yang/nbi-notifications@2020-11-30.yang
rename to api/src/main/yang/nbi-notifications@2021-06-28.yang
index aa068cd5317735db48d50f6ae2ab5c6273557a79..d61c3993a56aeb56c61338279279d182699be33c 100644 (file)
@@ -24,6 +24,11 @@ module nbi-notifications {
     "YANG definitions for using REST API in NBI notifications module. Copyright
      (c) 2020 ORANGE and others. All rights reserved.";
 
+  revision 2021-06-28 {
+    description
+      "Implement new models, RPC for service alarms";
+  }
+
   revision 2020-11-30 {
     description
       "Initial revision of NBI notifications";
@@ -72,14 +77,46 @@ module nbi-notifications {
     }
   }
 
+  grouping notification-alarm-service {
+    leaf message {
+      type string;
+      mandatory true;
+      description
+        "Message for the specified service";
+    }
+    leaf service-name {
+      type string;
+      mandatory true;
+      description
+        "Identifier for the service to be created in the ROADM network, e.g., CLFI, CLCI, etc.
+        This is reported against the service, but may not get reflected in the service in the network.";
+    }
+    leaf connection-type {
+      type org-openroadm-common-service-types:connection-type;
+      mandatory true;
+    }
+    leaf operational-state {
+      type org-openroadm-common-state-types:state;
+      config false;
+      description
+        "Operational State: Actual state of service";
+    }
+  }
+
   container notification-service {
     description
       "Model used to send a notification from a service request";
     uses notification-service;
   }
 
+  container notification-alarm-service {
+    description
+      "Model used to send a notification from the service listener";
+    uses notification-alarm-service;
+    }
+
   rpc get-notifications-service {
-    description "Get the notifications service send by ServiceHandler by filtering through connection type";
+    description "Get the notifications service sent by ServiceHandler through filtering connection type";
     input {
       leaf connection-type {
         type org-openroadm-common-service-types:connection-type;
@@ -107,6 +144,35 @@ module nbi-notifications {
     }
   }
 
+  rpc get-notifications-alarm-service {
+    description "Get the notifications alarm service sent by ServiceListener through filtering connection type";
+    input {
+      leaf connection-type {
+        type org-openroadm-common-service-types:connection-type;
+        mandatory true;
+        description
+          "Type connection of the service";
+      }
+      leaf id-consumer {
+        type string;
+        mandatory true;
+        description
+          "Unique ID for the consumer";
+      }
+      leaf group-id {
+        type string;
+        mandatory true;
+        description
+          "ID Group for the consumer";
+      }
+    }
+    output {
+      list notification-alarm-service {
+        uses notification-alarm-service;
+      }
+    }
+  }
+
   notification publish-notification-service {
     description "Publish the notifications service for topic";
     leaf topic {
@@ -117,4 +183,15 @@ module nbi-notifications {
      }
      uses notification-service;
   }
+
+  notification publish-notification-alarm-service {
+    description "Publish the notifications service alarm for topic";
+    leaf topic {
+      type string;
+      mandatory true;
+      description
+        "Topic where to send the notification service alarm";
+     }
+     uses notification-alarm-service;
+  }
 }
index adc4d062f7dc119f798ea43098e28440f3796a7f..1da61d040f0c7ab2dc03fac4794b9ea0a9caa166 100644 (file)
@@ -9,7 +9,7 @@ package org.opendaylight.transportpce.dmaap.client.impl;
 
 import org.opendaylight.mdsal.binding.api.NotificationService;
 import org.opendaylight.transportpce.dmaap.client.listener.NbiNotificationsListenerImpl;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
index ed84dae964be554a3f23e67cfb3c997d0448c1ff..d5003ce128a9fcf04e883614060a6f6a83379757 100644 (file)
@@ -17,8 +17,9 @@ import org.glassfish.jersey.logging.LoggingFeature;
 import org.opendaylight.transportpce.dmaap.client.resource.EventsApi;
 import org.opendaylight.transportpce.dmaap.client.resource.config.JsonConfigurator;
 import org.opendaylight.transportpce.dmaap.client.resource.model.CreatedEvent;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,4 +53,9 @@ public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
 
     }
 
+    @Override
+    public void onPublishNotificationAlarmService(PublishNotificationAlarmService notification) {
+
+    }
+
 }
index e9d936cba32abecf9cdb2217d75a6305aa01c8c6..b1a5ae03659151a58c1238591a8bb6f9703eb5d6 100644 (file)
@@ -14,7 +14,7 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import org.opendaylight.transportpce.dmaap.client.resource.model.CreatedEvent;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
 
 @Path("/events")
 public interface EventsApi {
index 6b615576a1a74f059fcce5f576a603530a43e3ff..b33de94ea40830dcd862ef8d7cccca5f0b237932 100644 (file)
@@ -13,9 +13,9 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev1
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirection;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.lgx.Lgx;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.Port;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEnd;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEnd;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEnd;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEnd;
 
 //This class is a temporary workaround while waiting jackson
 //support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852
index 0ed9b7b4d8a3141399b2dc57de595be9f7516403..0b282b4c99d297f24fe4f94d5efc562dd54a17cb 100644 (file)
@@ -11,7 +11,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import java.io.IOException;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
 
 // This class is a temporary workaround while waiting jackson
 // support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852
index 55bfaea7d83929d78560b866dff24dc585b38647..8e28cb46d628818be030c8724ae057b132465443 100644 (file)
@@ -11,7 +11,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import java.io.IOException;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEnd;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEnd;
 
 // This class is a temporary workaround while waiting jackson
 // support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852
index 1d65b1107bc00ba511efb025ecce307459903be4..485d00ac55865851c3bcf5f8adaf45134106b165 100644 (file)
@@ -11,7 +11,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import java.io.IOException;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEnd;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEnd;
 
 // This class is a temporary workaround while waiting jackson
 // support in yang tools https://git.opendaylight.org/gerrit/c/yangtools/+/94852
index 710bf1b313588f35c5a5ec4d8487385b01137354..39bdc0c563d81a0bb30600be6c125e69553f9cf2 100644 (file)
@@ -24,11 +24,11 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev1
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.endpoint.TxDirectionBuilder;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
 import org.opendaylight.yangtools.yang.common.Uint32;
 import org.slf4j.LoggerFactory;
 
index d18791064effdde531a52dd6883d0add3d7e5191..7ea47d2780420d1884ea15bcb063c62226a892a0 100644 (file)
@@ -75,6 +75,7 @@ import org.opendaylight.transportpce.servicehandler.impl.ServicehandlerProvider;
 import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl;
 import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl;
 import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl;
+import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener;
 import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations;
 import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperationsImpl;
 import org.opendaylight.transportpce.tapi.R2RTapiLinkDiscovery;
@@ -121,8 +122,9 @@ public class TransportPCEImpl extends AbstractLightyModule implements TransportP
     /**
      * List of publisher topics.
      */
-    private final List<String> publisherTopicList =
-            Arrays.asList("PceListener", "ServiceHandlerOperations", "ServiceHandler", "RendererListener");
+    private final List<String> publisherTopicList = Arrays.asList("PceListener", "ServiceHandlerOperations",
+            "ServiceHandler", "RendererListener");
+    private final List<String> publisherTopicAlarmList = Arrays.asList("ServiceListener");
 
     public TransportPCEImpl(LightyServices lightyServices, boolean activateNbiNotification) {
         LOG.info("Initializing transaction providers ...");
@@ -192,6 +194,8 @@ public class TransportPCEImpl extends AbstractLightyModule implements TransportP
             lightyServices.getBindingNotificationPublishService(), networkModelService);
         PceListenerImpl pceListenerImpl = new PceListenerImpl(rendererServiceOperations, pathComputationService,
             lightyServices.getBindingNotificationPublishService(), serviceDataStoreOperations);
+        ServiceListener serviceListener = new ServiceListener(lightyServices.getBindingDataBroker(),
+                lightyServices.getBindingNotificationPublishService());
         NetworkModelListenerImpl networkModelListenerImpl = new NetworkModelListenerImpl(
                 lightyServices.getBindingNotificationPublishService(), serviceDataStoreOperations);
         ServicehandlerImpl servicehandler = new ServicehandlerImpl(lightyServices.getBindingDataBroker(),
@@ -199,8 +203,8 @@ public class TransportPCEImpl extends AbstractLightyModule implements TransportP
             pceListenerImpl, rendererListenerImpl, networkModelListenerImpl, serviceDataStoreOperations, "N/A");
         servicehandlerProvider = new ServicehandlerProvider(lightyServices.getBindingDataBroker(),
                 lightyServices.getRpcProviderService(), lightyServices.getNotificationService(),
-                serviceDataStoreOperations, pceListenerImpl, rendererListenerImpl, networkModelListenerImpl,
-                servicehandler);
+                serviceDataStoreOperations, pceListenerImpl, serviceListener, rendererListenerImpl,
+                networkModelListenerImpl, servicehandler);
 
         LOG.info("Creating tapi beans ...");
         R2RTapiLinkDiscovery tapilinkDiscoveryImpl = new R2RTapiLinkDiscovery(lightyServices.getBindingDataBroker(),
@@ -225,7 +229,7 @@ public class TransportPCEImpl extends AbstractLightyModule implements TransportP
         if (activateNbiNotification) {
             LOG.info("Creating nbi-notifications beans ...");
             nbiNotificationsProvider = new NbiNotificationsProvider(
-                    publisherTopicList, null, null, lightyServices.getRpcProviderService(),
+                    publisherTopicList, publisherTopicAlarmList, null, null, lightyServices.getRpcProviderService(),
                     lightyServices.getNotificationService(), lightyServices.getAdapterContext().currentSerializer());
         }
     }
index 489bf880423e8aced080dd642877f0d0a03a1122..2b2575c3019886f6dae776235b7d2c822fce2e67 100644 (file)
@@ -348,7 +348,7 @@ public final class TPCEUtils {
                     .getInstance(),
             org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.tapi.rev180928.$YangModuleInfoImpl
                     .getInstance(),
-            org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.$YangModuleInfoImpl
+            org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.$YangModuleInfoImpl
                     .getInstance());
 
     private static final Set<YangModuleInfo> TPCE_YANG_MODEL = Stream.concat(
index cb044b126607f8b68c7a488c10719a5ecaf5b44b..735c6a8d25a2de0edb64b3790716300a72f2d7a3 100644 (file)
@@ -23,7 +23,7 @@ import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
 import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +35,7 @@ public class Subscriber {
 
     public Subscriber(String id, String groupId, String suscriberServer,
             JsonStringConverter<org.opendaylight.yang.gen.v1
-                .nbi.notifications.rev201130.NotificationService> deserializer) {
+                .nbi.notifications.rev210628.NotificationService> deserializer) {
         Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
         propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/SubscriberAlarm.java
new file mode 100644 (file)
index 0000000..f3bf9df
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceDeserializer;
+import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriberAlarm {
+    private static final Logger LOG = LoggerFactory.getLogger(SubscriberAlarm.class);
+
+    private final Consumer<String, NotificationAlarmService> consumer;
+
+    public SubscriberAlarm(String id, String groupId, String subscriberServer,
+                           JsonStringConverter<org.opendaylight.yang.gen.v1
+                .nbi.notifications.rev210628.NotificationAlarmService> deserializer) {
+        Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
+        propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
+        propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationAlarmServiceDeserializer.class);
+        propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
+        if (subscriberServer != null && !subscriberServer.isBlank()) {
+            propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer);
+        }
+        LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
+        consumer = new KafkaConsumer<>(propsConsumer);
+    }
+
+    public List<NotificationAlarmService> subscribeAlarm(String topicName) {
+        LOG.info("Subscribe request to topic '{}' ", topicName);
+        consumer.subscribe(Collections.singleton(topicName));
+        final ConsumerRecords<String, NotificationAlarmService> consumerRecords = consumer
+                .poll(Duration.ofMillis(1000));
+        List<NotificationAlarmService> notificationAlarmServiceList = new ArrayList<>();
+        YangInstanceIdentifier.of(NotificationAlarmService.QNAME);
+        for (ConsumerRecord<String, NotificationAlarmService> record : consumerRecords) {
+            if (record.value() != null) {
+                notificationAlarmServiceList.add(record.value());
+            }
+        }
+        LOG.info("Getting records '{}' ", notificationAlarmServiceList);
+        consumer.unsubscribe();
+        consumer.close();
+        return notificationAlarmServiceList;
+    }
+
+    @VisibleForTesting public SubscriberAlarm(Consumer<String, NotificationAlarmService> consumer) {
+        this.consumer = consumer;
+    }
+}
index 073ce49f8315e5b5cf3905ad5d70fff5f3736287..d7201d87ad63fc627f19d1dc88f0a678af5aca1b 100644 (file)
@@ -11,11 +11,16 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.consumer.Subscriber;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInput;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutputBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.transportpce.nbinotifications.consumer.SubscriberAlarm;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceInput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsAlarmServiceOutputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceInput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -24,12 +29,17 @@ import org.slf4j.LoggerFactory;
 public class NbiNotificationsImpl implements NbiNotificationsService {
     private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsImpl.class);
     private final JsonStringConverter<org.opendaylight.yang.gen.v1
-        .nbi.notifications.rev201130.NotificationService> converter;
+        .nbi.notifications.rev210628.NotificationService> converterService;
+    private final JsonStringConverter<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService;
     private final String server;
 
     public NbiNotificationsImpl(JsonStringConverter<org.opendaylight.yang.gen.v1
-            .nbi.notifications.rev201130.NotificationService> converter, String server) {
-        this.converter = converter;
+            .nbi.notifications.rev210628.NotificationService> converterService,
+                                JsonStringConverter<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService, String server) {
+        this.converterService = converterService;
+        this.converterAlarmService = converterAlarmService;
         this.server = server;
     }
 
@@ -41,11 +51,28 @@ public class NbiNotificationsImpl implements NbiNotificationsService {
             LOG.warn("Missing mandatory params for input {}", input);
             return RpcResultBuilder.success(new GetNotificationsServiceOutputBuilder().build()).buildFuture();
         }
-        Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converter);
+        Subscriber subscriber = new Subscriber(input.getIdConsumer(), input.getGroupId(), server, converterService);
         List<NotificationService> notificationServiceList = subscriber
                 .subscribeService(input.getConnectionType().getName());
         GetNotificationsServiceOutputBuilder output = new GetNotificationsServiceOutputBuilder()
                 .setNotificationService(notificationServiceList);
         return RpcResultBuilder.success(output.build()).buildFuture();
     }
+
+    @Override
+    public ListenableFuture<RpcResult<GetNotificationsAlarmServiceOutput>> getNotificationsAlarmService(
+            GetNotificationsAlarmServiceInput input) {
+        LOG.info("RPC getNotificationsAlarmService received");
+        if (input == null || input.getIdConsumer() == null || input.getGroupId() == null) {
+            LOG.warn("Missing mandatory params for input {}", input);
+            return RpcResultBuilder.success(new GetNotificationsAlarmServiceOutputBuilder().build()).buildFuture();
+        }
+        SubscriberAlarm subscriberAlarm = new SubscriberAlarm(input.getIdConsumer(), input.getGroupId(), server,
+                converterAlarmService);
+        List<NotificationAlarmService> notificationAlarmServiceList = subscriberAlarm
+                .subscribeAlarm("alarm" + input.getConnectionType().getName());
+        GetNotificationsAlarmServiceOutputBuilder output = new GetNotificationsAlarmServiceOutputBuilder()
+                .setNotificationAlarmService(notificationAlarmServiceList);
+        return RpcResultBuilder.success(output.build()).buildFuture();
+    }
 }
index 5f41034465a03cd25f17eae8238adf991cdc114a..3278416536026cfa21170a572918f6a7c432ad87 100644 (file)
@@ -16,8 +16,9 @@ import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
 import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
+import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.ObjectRegistration;
 import org.slf4j.Logger;
@@ -26,29 +27,37 @@ import org.slf4j.LoggerFactory;
 public class NbiNotificationsProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class);
-    private static Map<String, Publisher> publishersMap =  new HashMap<>();
+    private static Map<String, Publisher> publishersServiceMap =  new HashMap<>();
+    private static Map<String, PublisherAlarm> publishersAlarmMap =  new HashMap<>();
 
     private final RpcProviderService rpcService;
     private ObjectRegistration<NbiNotificationsService> rpcRegistration;
     private ListenerRegistration<NbiNotificationsListener> listenerRegistration;
     private NotificationService notificationService;
     private final JsonStringConverter<org.opendaylight.yang.gen.v1
-        .nbi.notifications.rev201130.NotificationService> converter;
-    private final String suscriberServer;
+        .nbi.notifications.rev210628.NotificationService> converterService;
+    private final JsonStringConverter<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService;
+    private final String subscriberServer;
 
 
-    public NbiNotificationsProvider(List<String> topics,
-            String suscriberServer, String publisherServer,
+    public NbiNotificationsProvider(List<String> topicsService, List<String> topicsAlarm,
+            String subscriberServer, String publisherServer,
             RpcProviderService rpcProviderService, NotificationService notificationService,
             BindingDOMCodecServices bindingDOMCodecServices) {
         this.rpcService = rpcProviderService;
         this.notificationService = notificationService;
-        converter =  new JsonStringConverter<>(bindingDOMCodecServices);
-        for (String topic: topics) {
+        converterService =  new JsonStringConverter<>(bindingDOMCodecServices);
+        for (String topic: topicsService) {
+            LOG.info("Creating publisher for topic {}", topic);
+            publishersServiceMap.put(topic, new Publisher(topic, publisherServer, converterService));
+        }
+        converterAlarmService = new JsonStringConverter<>(bindingDOMCodecServices);
+        for (String topic: topicsAlarm) {
             LOG.info("Creating publisher for topic {}", topic);
-            publishersMap.put(topic, new Publisher(topic, publisherServer, converter));
+            publishersAlarmMap.put(topic, new PublisherAlarm(topic, publisherServer, converterAlarmService));
         }
-        this.suscriberServer = suscriberServer;
+        this.subscriberServer = subscriberServer;
     }
 
     /**
@@ -57,18 +66,21 @@ public class NbiNotificationsProvider {
     public void init() {
         LOG.info("NbiNotificationsProvider Session Initiated");
         rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class,
-                new NbiNotificationsImpl(converter, suscriberServer));
+                new NbiNotificationsImpl(converterService, converterAlarmService, subscriberServer));
         listenerRegistration = notificationService.registerNotificationListener(
-                new NbiNotificationsListenerImpl(publishersMap));
+                new NbiNotificationsListenerImpl(publishersServiceMap, publishersAlarmMap));
     }
 
     /**
      * Method called when the blueprint container is destroyed.
      */
     public void close() {
-        for (Publisher publisher : publishersMap.values()) {
+        for (Publisher publisher : publishersServiceMap.values()) {
             publisher.close();
         }
+        for (PublisherAlarm publisherAlarm : publishersAlarmMap.values()) {
+            publisherAlarm.close();
+        }
         rpcRegistration.close();
         listenerRegistration.close();
         LOG.info("NbiNotificationsProvider Closed");
index 0bcdf6bd27cad0e86f84c9e42a8449f82103730a..2e8427f641a12b34f7d315404f0d21ebfdb61a44 100644 (file)
@@ -7,39 +7,60 @@
  */
 package org.opendaylight.transportpce.nbinotifications.listener;
 
-import java.util.HashMap;
 import java.util.Map;
 import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
+import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class NbiNotificationsListenerImpl implements NbiNotificationsListener {
     private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsListenerImpl.class);
-    private Map<String, Publisher> publishersMap =  new HashMap<>();
+    private Map<String, Publisher> publishersServiceMap;
+    private Map<String, PublisherAlarm> publishersAlarmMap;
 
-    public NbiNotificationsListenerImpl(Map<String, Publisher> publishersMap) {
-        this.publishersMap = publishersMap;
+    public NbiNotificationsListenerImpl(Map<String, Publisher> publishersServiceMap,
+                                        Map<String, PublisherAlarm> publishersAlarmMap) {
+        this.publishersServiceMap = publishersServiceMap;
+        this.publishersAlarmMap = publishersAlarmMap;
     }
 
     @Override
     public void onPublishNotificationService(PublishNotificationService notification) {
         LOG.info("Receiving request for publishing notification service");
         String topic = notification.getTopic();
-        if (!publishersMap.containsKey(topic)) {
+        if (!publishersServiceMap.containsKey(topic)) {
             LOG.error("Unknown topic {}", topic);
             return;
         }
-        Publisher publisher = publishersMap.get(topic);
+        Publisher publisher = publishersServiceMap.get(topic);
         publisher.sendEvent(new NotificationServiceBuilder().setCommonId(notification.getCommonId())
                 .setConnectionType(notification.getConnectionType()).setMessage(notification.getMessage())
                 .setOperationalState(notification.getOperationalState())
                 .setResponseFailed(notification.getResponseFailed())
-                .setServiceAEnd(notification.getServiceAEnd()).setServiceName(notification.getServiceName())
+                .setServiceAEnd(notification.getServiceAEnd())
+                .setServiceName(notification.getServiceName())
                 .setServiceZEnd(notification.getServiceZEnd()).build());
-
     }
 
+    @Override
+    public void onPublishNotificationAlarmService(PublishNotificationAlarmService notification) {
+        LOG.info("Receiving request for publishing notification alarm service");
+        String topic = notification.getTopic();
+        if (!publishersAlarmMap.containsKey(topic)) {
+            LOG.error("Unknown topic {}", topic);
+            return;
+        }
+        PublisherAlarm publisherAlarm = publishersAlarmMap.get(topic);
+        publisherAlarm.sendEvent(new NotificationAlarmServiceBuilder().setConnectionType(notification
+                .getConnectionType())
+                .setMessage(notification.getMessage())
+                .setOperationalState(notification.getOperationalState())
+                .setServiceName(notification.getServiceName())
+                .build());
+    }
 }
index 21295a9b6e5e383c0515c295560a3b410cab0af7..664a7c9a0867ad50ba07357202d877f54b57b62b 100644 (file)
@@ -18,7 +18,7 @@ import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
 import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/producer/PublisherAlarm.java
new file mode 100644 (file)
index 0000000..b275548
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.producer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
+import org.opendaylight.transportpce.nbinotifications.serialization.NotificationAlarmServiceSerializer;
+import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PublisherAlarm {
+    private static final Logger LOG = LoggerFactory.getLogger(PublisherAlarm.class);
+
+    private final String id;
+    private final Producer<String, NotificationAlarmService> producer;
+
+    public PublisherAlarm(String id, String publisherServer, JsonStringConverter<NotificationAlarmService> serializer) {
+        Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
+        properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
+        if (publisherServer != null && !publisherServer.isBlank()) {
+            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
+        }
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationAlarmServiceSerializer.class);
+        properties.put(ConfigConstants.CONVERTER , serializer);
+        LOG.info("Creationg publisher for id {} with properties {}", id, properties);
+        producer = new KafkaProducer<>(properties);
+        this.id = id;
+    }
+
+    @VisibleForTesting
+    PublisherAlarm(String id, Producer<String, NotificationAlarmService> producer) {
+        this.producer = producer;
+        this.id = id;
+    }
+
+    public void close() {
+        producer.close();
+    }
+
+    public void sendEvent(NotificationAlarmService notificationAlarmService) {
+        LOG.info("SendEvent request to topic '{}' ", notificationAlarmService.getConnectionType().getName());
+        producer.send(new ProducerRecord<>("alarm" + notificationAlarmService.getConnectionType().getName(),
+                id, notificationAlarmService));
+        producer.flush();
+    }
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceDeserializer.java
new file mode 100644 (file)
index 0000000..4612d84
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.alarm.service.output.NotificationAlarmServiceBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationAlarmServiceDeserializer implements Deserializer<NotificationAlarmService> {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationAlarmServiceDeserializer.class);
+    private JsonStringConverter<org.opendaylight.yang.gen.v1
+        .nbi.notifications.rev210628.NotificationAlarmService> converter;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        LOG.info("Deserializer configuration {}", configs);
+        if (configs.containsKey(ConfigConstants.CONVERTER)
+                && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+            converter = (JsonStringConverter<org.opendaylight.yang.gen.v1
+                    .nbi.notifications.rev210628.NotificationAlarmService>) configs
+                    .get(ConfigConstants.CONVERTER);
+        }
+    }
+
+    @Override
+    public NotificationAlarmService deserialize(String topic, byte[] data) {
+        if (converter == null) {
+            throw new IllegalArgumentException(
+                    "Converter should be configured through configure method of deserializer");
+        }
+        String value = new String(data, StandardCharsets.UTF_8);
+        // The message published is
+        // org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService
+        // we have to map it to
+        // org.opendaylight.yang.gen
+        // .v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService
+        org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService mappedString = converter
+                .createDataObjectFromJsonString(YangInstanceIdentifier.of(
+                        org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService.QNAME),
+                        value,
+                        JSONCodecFactorySupplier.RFC7951);
+        if (mappedString != null) {
+            LOG.info("Reading event {}", mappedString);
+            return new NotificationAlarmServiceBuilder().setConnectionType(mappedString.getConnectionType())
+                    .setMessage(mappedString.getMessage())
+                    .setOperationalState(mappedString.getOperationalState())
+                    .setServiceName(mappedString.getServiceName())
+                    .build();
+        }
+        return null;
+    }
+}
diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializer.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/serialization/NotificationAlarmServiceSerializer.java
new file mode 100644 (file)
index 0000000..c2ce676
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2021 Orange, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.nbinotifications.serialization;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
+import org.opendaylight.transportpce.common.converter.JsonStringConverter;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationAlarmService;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class NotificationAlarmServiceSerializer implements Serializer<NotificationAlarmService> {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationAlarmServiceSerializer.class);
+    private JsonStringConverter<NotificationAlarmService> converter;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        LOG.info("Deserializer configuration {}", configs);
+        if (configs.containsKey(ConfigConstants.CONVERTER)
+                && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
+            converter = (JsonStringConverter<NotificationAlarmService>) configs.get(ConfigConstants.CONVERTER);
+        }
+    }
+
+    @Override
+    public byte[] serialize(String topic, NotificationAlarmService data) {
+        if (converter == null) {
+            throw new IllegalArgumentException(
+                    "Converter should be" + "configured through configure method of serializer");
+        }
+        if (data == null) {
+            return new byte[0];
+        }
+        try {
+            InstanceIdentifier<NotificationAlarmService> iid =
+                    InstanceIdentifier.builder(NotificationAlarmService.class).build();
+            String serialized = converter.createJsonStringFromDataObject(iid, data, JSONCodecFactorySupplier.RFC7951);
+            LOG.info("Serialized event {}", serialized);
+            return serialized.getBytes(StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            return new byte[0];
+        }
+    }
+}
index 95f10fb639657813553656329d8d8244821ce8f5..81f02ac9451f230d62e4dfdf127b8c90b339efb8 100644 (file)
@@ -11,8 +11,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
 import org.slf4j.Logger;
@@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory;
 public class NotificationServiceDeserializer implements Deserializer<NotificationService> {
     private static final Logger LOG = LoggerFactory.getLogger(NotificationServiceDeserializer.class);
     private JsonStringConverter<org.opendaylight.yang.gen.v1
-        .nbi.notifications.rev201130.NotificationService> converter;
+        .nbi.notifications.rev210628.NotificationService> converter;
 
     @SuppressWarnings("unchecked")
     @Override
@@ -30,7 +30,7 @@ public class NotificationServiceDeserializer implements Deserializer<Notificatio
         if (configs.containsKey(ConfigConstants.CONVERTER)
                 && configs.get(ConfigConstants.CONVERTER) instanceof JsonStringConverter<?>) {
             converter = (JsonStringConverter<org.opendaylight.yang.gen.v1
-                    .nbi.notifications.rev201130.NotificationService>) configs
+                    .nbi.notifications.rev210628.NotificationService>) configs
                     .get(ConfigConstants.CONVERTER);
         }
     }
@@ -47,9 +47,9 @@ public class NotificationServiceDeserializer implements Deserializer<Notificatio
         // we have to map it to
         // org.opendaylight.yang.gen
         // .v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService
-        org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService mappedString = converter
+        org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService mappedString = converter
                 .createDataObjectFromJsonString(YangInstanceIdentifier.of(
-                        org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService.QNAME),
+                        org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService.QNAME),
                         value,
                         JSONCodecFactorySupplier.RFC7951);
         if (mappedString != null) {
index eb998746f9a1f25349f49d097921c008579ffa2a..bc99d90c9cb55e577b07ecf4ea91d71b0ecd4cab 100644 (file)
@@ -12,7 +12,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import org.apache.kafka.common.serialization.Serializer;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
 import org.slf4j.Logger;
index 9b3a3bd7fd9339b7ff574f974740b6deaf55b3c1..d938b9d28d1608a29e35effd823a46fe8deff714 100644 (file)
@@ -26,6 +26,11 @@ xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
                 <value>RendererListener</value>
             </list>
         </argument>
+        <argument>
+            <list value-type="java.lang.String">
+                <value>ServiceListener</value>
+            </list>
+        </argument>
         <argument value="${suscriber.server}"/>
         <argument value="${publisher.server}"/>
         <argument ref="rpcService" />
index de58a498af4234e7635911e37d7af075a9d7ef3d..5427020e1622403646cf2879fa9eacec29b9e81d 100644 (file)
@@ -22,7 +22,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils;
 import org.opendaylight.transportpce.test.AbstractTest;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
 
 public class SubscriberTest extends AbstractTest {
     private static final String TOPIC = "topic";
index da825509a7470b57800f44e0da56c9720bf7cc39..9de03934c29522d7bc7a8c5fd9b8cf941cf81718 100644 (file)
@@ -16,8 +16,8 @@ import org.junit.Test;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.test.AbstractTest;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceInputBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.GetNotificationsServiceOutput;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceInputBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.GetNotificationsServiceOutput;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
 public class NbiNotificationsImplTest extends AbstractTest {
@@ -26,9 +26,12 @@ public class NbiNotificationsImplTest extends AbstractTest {
     @Before
     public void setUp() {
         JsonStringConverter<org.opendaylight.yang.gen.v1
-            .nbi.notifications.rev201130.NotificationService> converter = new JsonStringConverter<>(
+            .nbi.notifications.rev210628.NotificationService> converter = new JsonStringConverter<>(
                 getDataStoreContextUtil().getBindingDOMCodecServices());
-        nbiNotificationsImpl = new NbiNotificationsImpl(converter, "localhost:8080");
+        JsonStringConverter<org.opendaylight.yang.gen.v1
+                .nbi.notifications.rev210628.NotificationAlarmService> converterAlarm = new JsonStringConverter<>(
+                getDataStoreContextUtil().getBindingDOMCodecServices());
+        nbiNotificationsImpl = new NbiNotificationsImpl(converter, converterAlarm,"localhost:8080");
     }
 
     public void getNotificationsServiceEmptyDataTest() throws InterruptedException, ExecutionException {
index 178fc4243727a70df2401aa6d2be6b0be70f8614..2b917aab040360084fd2006133e0f0421b1b6414 100644 (file)
@@ -38,8 +38,8 @@ public class NbiNotificationsProviderTest  extends AbstractTest {
     @Test
     public void initTest() {
         NbiNotificationsProvider provider = new NbiNotificationsProvider(
-                Arrays.asList("topic1", "topic2"), "localhost:8080", "localhost:8080",
-                rpcProviderRegistry, notificationService,
+                Arrays.asList("topic1", "topic2"), Arrays.asList("topic1", "topic2"), "localhost:8080",
+                "localhost:8080", rpcProviderRegistry, notificationService,
                 getDataStoreContextUtil().getBindingDOMCodecServices());
         provider.init();
         verify(rpcProviderRegistry, times(1))
index cb8faf242d1949277ca54644862428b38ffa99b0..899baed01718849435cfda050030ae6bb104398d 100644 (file)
@@ -17,15 +17,18 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
+import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
 import org.opendaylight.transportpce.test.AbstractTest;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.ConnectionType;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
 
 public class NbiNotificationsListenerImplTest extends AbstractTest {
     @Mock
     private Publisher publisher;
+    @Mock
+    private PublisherAlarm publisherAlarm;
 
     @Before
     public void setUp() {
@@ -34,7 +37,8 @@ public class NbiNotificationsListenerImplTest extends AbstractTest {
 
     @Test
     public void onPublishNotificationServiceTest() {
-        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher));
+        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher),
+                Map.of("test", publisherAlarm));
         PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("test")
                 .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
                 .setOperationalState(State.OutOfService).setServiceName("service name").build();
@@ -44,7 +48,8 @@ public class NbiNotificationsListenerImplTest extends AbstractTest {
 
     @Test
     public void onPublishNotificationServiceWrongTopicTest() {
-        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher));
+        NbiNotificationsListenerImpl listener = new NbiNotificationsListenerImpl(Map.of("test", publisher),
+                Map.of("test", publisherAlarm));
         PublishNotificationService notification = new PublishNotificationServiceBuilder().setTopic("wrongtopic")
                 .setCommonId("commonId").setConnectionType(ConnectionType.Service).setMessage("Service deleted")
                 .setOperationalState(State.OutOfService).setServiceName("service name").build();
index b3a5b2edbeb5b5a858eb377f012b8c3f70c3c669..f738fd71d83569790b5260e8f624795d7564632b 100644 (file)
@@ -21,7 +21,7 @@ import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
 import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
 import org.opendaylight.transportpce.test.AbstractTest;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
 
index 928c2471a200594a984121b6252ec259ea8be82c..430e7d8e2493f4496dff5157d7c281cd605ac925 100644 (file)
@@ -16,13 +16,13 @@ import java.util.Map;
 import org.junit.Test;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.test.AbstractTest;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.get.notifications.service.output.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
 
 public class NotificationServiceDeserializerTest extends AbstractTest {
 
     @Test
     public void deserializeTest() throws IOException {
-        JsonStringConverter<org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService> converter =
+        JsonStringConverter<org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService> converter =
                 new JsonStringConverter<>(getDataStoreContextUtil().getBindingDOMCodecServices());
         NotificationServiceDeserializer deserializer = new NotificationServiceDeserializer();
         Map<String, Object> configs = Map.of(ConfigConstants.CONVERTER, converter);
index 3fe658b6bdafd1a46c69bf82e1d7a447817310e1..ca11dda9ce0ff2cfbf781190f852f6f345ad43f5 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.transportpce.nbinotifications.serialization;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -18,7 +20,7 @@ import java.util.Map;
 import org.junit.Test;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.test.AbstractTest;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
 
@@ -39,6 +41,8 @@ public class NotificationServiceSerializerTest extends AbstractTest {
         serializer.close();
         assertNotNull("Serialized data should not be null", data);
         String expectedJson = Files.readString(Paths.get("src/test/resources/expected_event.json"));
+        // Minify the json string
+        expectedJson = new ObjectMapper().readValue(expectedJson, JsonNode.class).toString();
         assertEquals("The event should be equals", expectedJson, new String(data, StandardCharsets.UTF_8));
     }
 }
index 44b6958884e24ea6c3a1a66c7be501b78294daab..0921e99c67c38a169739b766902a1e4ff4086407 100644 (file)
@@ -15,10 +15,10 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev1
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.service.types.rev190531.service.port.PortBuilder;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.service.format.rev190531.ServiceFormat;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
 import org.opendaylight.yangtools.yang.common.Uint32;
 
 public final class NotificationServiceDataUtils {
@@ -41,11 +41,11 @@ public final class NotificationServiceDataUtils {
     }
 
     public static org.opendaylight.yang.gen.v1
-        .nbi.notifications.rev201130.get.notifications.service.output.NotificationService buildReceivedEvent() {
+        .nbi.notifications.rev210628.get.notifications.service.output.NotificationService buildReceivedEvent() {
         org.opendaylight.yang.gen.v1
-            .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder
+            .nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder
             notificationServiceBuilder = new org.opendaylight.yang.gen.v1
-            .nbi.notifications.rev201130.get.notifications.service.output.NotificationServiceBuilder()
+            .nbi.notifications.rev210628.get.notifications.service.output.NotificationServiceBuilder()
                 .setMessage("message")
                 .setServiceName("service1")
                 .setOperationalState(State.InService)
index 63c7bec737db5f9b003c7c613560f2224d591f86..9e85e38f27a774b1bfc8f372d6d304e74b739421 100644 (file)
@@ -1,7 +1,6 @@
 {
     "notification-service": {
-        "service-name": "service1",
-        "service-a-end": {
+        "service-z-end": {
             "service-format": "OC",
             "node-id": "XPONDER-1-2",
             "service-rate": 1,
                 }
             }
         },
-        "common-id": "commond-id",
-        "operational-state": "inService",
         "connection-type": "service",
-        "service-z-end": {
+        "operational-state": "inService",
+        "common-id": "commond-id",
+        "service-a-end": {
             "service-format": "OC",
             "node-id": "XPONDER-1-2",
             "service-rate": 1,
@@ -84,6 +83,7 @@
                 }
             }
         },
+        "service-name": "service1",
         "message": "message",
         "response-failed": ""
     }
index 2b8924df381b4e33b2b5d4d59713f9d759ed86f1..499c7e47a5e843be835e3c0d269301c1c47840c0 100644 (file)
@@ -1 +1,90 @@
-{"notification-service":{"service-name":"service1","service-a-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"common-id":"commond-id","operational-state":"inService","connection-type":"service","service-z-end":{"service-format":"OC","node-id":"XPONDER-1-2","service-rate":1,"clli":"clli","tx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}},"rx-direction":{"port":{"port-slot":"port slot","port-device-name":"device name","port-rack":"port rack","port-shelf":"port shelf","port-type":"port type","port-sub-slot":"port subslot","port-name":"port name"},"lgx":{"lgx-port-name":"lgx port name","lgx-port-shelf":"lgx port shelf","lgx-port-rack":"lgx port rack","lgx-device-name":"lgx device name"}}},"message":"message","response-failed":""}}
\ No newline at end of file
+{
+  "notification-service": {
+    "service-z-end": {
+      "clli": "clli",
+      "service-rate": 1,
+      "node-id": "XPONDER-1-2",
+      "rx-direction": {
+        "lgx": {
+          "lgx-port-name": "lgx port name",
+          "lgx-port-shelf": "lgx port shelf",
+          "lgx-port-rack": "lgx port rack",
+          "lgx-device-name": "lgx device name"
+        },
+        "port": {
+          "port-shelf": "port shelf",
+          "port-rack": "port rack",
+          "port-device-name": "device name",
+          "port-name": "port name",
+          "port-slot": "port slot",
+          "port-type": "port type",
+          "port-sub-slot": "port subslot"
+        }
+      },
+      "service-format": "OC",
+      "tx-direction": {
+        "lgx": {
+          "lgx-port-name": "lgx port name",
+          "lgx-port-shelf": "lgx port shelf",
+          "lgx-port-rack": "lgx port rack",
+          "lgx-device-name": "lgx device name"
+        },
+        "port": {
+          "port-shelf": "port shelf",
+          "port-rack": "port rack",
+          "port-device-name": "device name",
+          "port-name": "port name",
+          "port-slot": "port slot",
+          "port-type": "port type",
+          "port-sub-slot": "port subslot"
+        }
+      }
+    },
+    "connection-type": "service",
+    "operational-state": "inService",
+    "common-id": "commond-id",
+    "service-a-end": {
+      "clli": "clli",
+      "service-rate": 1,
+      "node-id": "XPONDER-1-2",
+      "rx-direction": {
+        "lgx": {
+          "lgx-port-name": "lgx port name",
+          "lgx-port-shelf": "lgx port shelf",
+          "lgx-port-rack": "lgx port rack",
+          "lgx-device-name": "lgx device name"
+        },
+        "port": {
+          "port-shelf": "port shelf",
+          "port-rack": "port rack",
+          "port-device-name": "device name",
+          "port-name": "port name",
+          "port-slot": "port slot",
+          "port-type": "port type",
+          "port-sub-slot": "port subslot"
+        }
+      },
+      "service-format": "OC",
+      "tx-direction": {
+        "lgx": {
+          "lgx-port-name": "lgx port name",
+          "lgx-port-shelf": "lgx port shelf",
+          "lgx-port-rack": "lgx port rack",
+          "lgx-device-name": "lgx device name"
+        },
+        "port": {
+          "port-shelf": "port shelf",
+          "port-rack": "port rack",
+          "port-device-name": "device name",
+          "port-name": "port name",
+          "port-slot": "port slot",
+          "port-type": "port type",
+          "port-sub-slot": "port subslot"
+        }
+      }
+    },
+    "service-name": "service1",
+    "response-failed": "",
+    "message": "message"
+  }
+}
\ No newline at end of file
index 3774f81397c45e5efb4b850b4e60cb81e173839b..e176348fe2e64246d2ed8a20dc8feb436577e9db 100644 (file)
@@ -80,10 +80,10 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.TempSer
 import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.delete.input.ServiceDeleteReqInfo.TailRetention;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.delete.input.ServiceDeleteReqInfoBuilder;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
index 0fec8b9e7957428afa7b4a3b8a9c8872813f3c06..8d29234c20cadb8032375687cda77dddb2f7d853 100644 (file)
@@ -9,18 +9,24 @@
 package org.opendaylight.transportpce.servicehandler.impl;
 
 import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
 import org.opendaylight.mdsal.binding.api.NotificationService;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl;
 import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl;
 import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl;
+import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener;
 import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations;
 import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.networkmodel.rev201116.TransportpceNetworkmodelListener;
 import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.pce.rev210701.TransportpcePceListener;
 import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.renderer.rev210618.TransportpceRendererListener;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.OrgOpenroadmServiceService;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.ServiceList;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,23 +39,27 @@ import org.slf4j.LoggerFactory;
 public class ServicehandlerProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(ServicehandlerProvider.class);
+    private static final InstanceIdentifier<Services> SERVICE = InstanceIdentifier.builder(ServiceList.class)
+            .child(Services.class).build();
 
     private final DataBroker dataBroker;
     private final RpcProviderService rpcService;
     private final NotificationService notificationService;
     private ListenerRegistration<TransportpcePceListener> pcelistenerRegistration;
+    private ListenerRegistration<ServiceListener> serviceDataTreeChangeListenerRegistration;
     private ListenerRegistration<TransportpceRendererListener> rendererlistenerRegistration;
     private ListenerRegistration<TransportpceNetworkmodelListener> networkmodellistenerRegistration;
     private ObjectRegistration<OrgOpenroadmServiceService> rpcRegistration;
     private ServiceDataStoreOperations serviceDataStoreOperations;
     private PceListenerImpl pceListenerImpl;
+    private ServiceListener serviceListener;
     private RendererListenerImpl rendererListenerImpl;
     private NetworkModelListenerImpl networkModelListenerImpl;
     private ServicehandlerImpl servicehandler;
 
     public ServicehandlerProvider(final DataBroker dataBroker, RpcProviderService rpcProviderService,
             NotificationService notificationService, ServiceDataStoreOperations serviceDataStoreOperations,
-            PceListenerImpl pceListenerImpl, RendererListenerImpl rendererListenerImpl,
+            PceListenerImpl pceListenerImpl, ServiceListener serviceListener, RendererListenerImpl rendererListenerImpl,
             NetworkModelListenerImpl networkModelListenerImpl, ServicehandlerImpl servicehandler) {
         this.dataBroker = dataBroker;
         this.rpcService = rpcProviderService;
@@ -57,6 +67,7 @@ public class ServicehandlerProvider {
         this.serviceDataStoreOperations = serviceDataStoreOperations;
         this.serviceDataStoreOperations.initialize();
         this.pceListenerImpl = pceListenerImpl;
+        this.serviceListener = serviceListener;
         this.rendererListenerImpl = rendererListenerImpl;
         this.networkModelListenerImpl = networkModelListenerImpl;
         this.servicehandler = servicehandler;
@@ -68,6 +79,8 @@ public class ServicehandlerProvider {
     public void init() {
         LOG.info("ServicehandlerProvider Session Initiated");
         pcelistenerRegistration = notificationService.registerNotificationListener(pceListenerImpl);
+        serviceDataTreeChangeListenerRegistration = dataBroker.registerDataTreeChangeListener(
+                DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, SERVICE), serviceListener);
         rendererlistenerRegistration = notificationService.registerNotificationListener(rendererListenerImpl);
         networkmodellistenerRegistration = notificationService.registerNotificationListener(networkModelListenerImpl);
         rpcRegistration = rpcService.registerRpcImplementation(OrgOpenroadmServiceService.class, servicehandler);
@@ -79,6 +92,7 @@ public class ServicehandlerProvider {
     public void close() {
         LOG.info("ServicehandlerProvider Closed");
         pcelistenerRegistration.close();
+        serviceDataTreeChangeListenerRegistration.close();
         rendererlistenerRegistration.close();
         networkmodellistenerRegistration.close();
         rpcRegistration.close();
index 8782bb79c46311a9c4433deadada08a160353770..305c4156ee37b4cafe11e918c55d07aaf1ffbd31 100644 (file)
@@ -28,10 +28,10 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service
 import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.RpcStatusEx;
 import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.response.parameters.sp.ResponseParameters;
 import org.opendaylight.yang.gen.v1.http.org.transportpce.b.c._interface.service.types.rev200128.response.parameters.sp.ResponseParametersBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
index 82eb677604059711435d25c8cdbfadb1615d43db..b8519b6ff9a9cf7e095f008e610160074eda6df2 100644 (file)
@@ -26,10 +26,10 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev191
 import org.opendaylight.yang.gen.v1.http.org.openroadm.equipment.states.types.rev191129.AdminStates;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
 import org.opendaylight.yang.gen.v1.http.transportpce.topology.rev210511.OtnLinkType;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationService;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.PublishNotificationServiceBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceAEndBuilder;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.notification.service.ServiceZEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationServiceBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceAEndBuilder;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.notification.service.ServiceZEndBuilder;
 import org.opendaylight.yangtools.yang.common.Uint32;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListener.java b/servicehandler/src/main/java/org/opendaylight/transportpce/servicehandler/listeners/ServiceListener.java
new file mode 100644 (file)
index 0000000..ee64141
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright © 2021 Orange and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.transportpce.servicehandler.listeners;
+
+import java.util.Collection;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.common.state.types.rev181130.State;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.service.rev190531.service.list.Services;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.PublishNotificationAlarmServiceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServiceListener implements DataTreeChangeListener<Services> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServiceListener.class);
+    private static final String TOPIC = "ServiceListener";
+    private final DataBroker dataBroker;
+    private NotificationPublishService notificationPublishService;
+
+    public ServiceListener(final DataBroker dataBroker, NotificationPublishService notificationPublishService) {
+        this.dataBroker = dataBroker;
+        this.notificationPublishService = notificationPublishService;
+    }
+
+    public void onDataTreeChanged(Collection<DataTreeModification<Services>> changes) {
+        LOG.info("onDataTreeChanged - {}", this.getClass().getSimpleName());
+        for (DataTreeModification<Services> change : changes) {
+            DataObjectModification<Services> rootService = change.getRootNode();
+            if (rootService.getDataBefore() == null) {
+                continue;
+            }
+            String serviceName = rootService.getDataBefore().key().getServiceName();
+            switch (rootService.getModificationType()) {
+                case DELETE:
+                    LOG.info("Service {} correctly deleted from controller", serviceName);
+                    break;
+                case WRITE:
+                    Services input = rootService.getDataAfter();
+                    if (rootService.getDataBefore().getOperationalState() == State.InService
+                            && rootService.getDataAfter().getOperationalState() == State.OutOfService) {
+                        LOG.info("Service {} is becoming outOfService", serviceName);
+                        sendNbiNotification(new PublishNotificationAlarmServiceBuilder()
+                                .setServiceName(input.getServiceName())
+                                .setConnectionType(input.getConnectionType())
+                                .setMessage("The service is now outOfService")
+                                .setOperationalState(State.OutOfService)
+                                .setTopic(TOPIC)
+                                .build());
+                    }
+                    else if (rootService.getDataBefore().getOperationalState() == State.OutOfService
+                            && rootService.getDataAfter().getOperationalState() == State.InService) {
+                        LOG.info("Service {} is becoming InService", serviceName);
+                        sendNbiNotification(new PublishNotificationAlarmServiceBuilder()
+                                .setServiceName(input.getServiceName())
+                                .setConnectionType(input.getConnectionType())
+                                .setMessage("The service is now inService")
+                                .setOperationalState(State.InService)
+                                .setTopic(TOPIC)
+                                .build());
+                    }
+                    break;
+                default:
+                    LOG.debug("Unknown modification type {}", rootService.getModificationType().name());
+                    break;
+            }
+        }
+    }
+
+    /**
+     * Send notification to NBI notification in order to publish message.
+     *
+     * @param service PublishNotificationAlarmService
+     */
+    private void sendNbiNotification(PublishNotificationAlarmService service) {
+        try {
+            notificationPublishService.putNotification(service);
+        } catch (InterruptedException e) {
+            LOG.warn("Cannot send notification to nbi", e);
+            Thread.currentThread().interrupt();
+        }
+    }
+}
index 62a7e640022c5f6c316d574d265e396a58544e97..8e2ba983b991f65cc95946dd59a754fd8950c54c 100644 (file)
@@ -43,6 +43,11 @@ Author: Martial Coulibaly <martial.coulibaly@gfi.com> on behalf of Orange
         <argument ref="serviceDatastoreOperation" />
     </bean>
 
+    <bean id="serviceListener" class="org.opendaylight.transportpce.servicehandler.listeners.ServiceListener">
+        <argument ref="dataBroker" />
+        <argument ref="notificationPublishService" />
+    </bean>
+
     <bean id="rendererListener" class="org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl">
         <argument ref="pathComputationService" />
         <argument ref="notificationPublishService" />
@@ -74,6 +79,7 @@ Author: Martial Coulibaly <martial.coulibaly@gfi.com> on behalf of Orange
         <argument ref="notificationService" />
         <argument ref="serviceDatastoreOperation" />
         <argument ref="pceListener" />
+        <argument ref="serviceListener" />
         <argument ref="rendererListener" />
         <argument ref="networkModelListener" />
         <argument ref="serviceHandlerImpl" />
index e458df832683dc7bf58b06f801ff9785c30d36c5..85b51d7cbf842a9e2ed1f9332b7498aa6997bbc8 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.mdsal.binding.api.RpcProviderService;
 import org.opendaylight.transportpce.servicehandler.listeners.NetworkModelListenerImpl;
 import org.opendaylight.transportpce.servicehandler.listeners.PceListenerImpl;
 import org.opendaylight.transportpce.servicehandler.listeners.RendererListenerImpl;
+import org.opendaylight.transportpce.servicehandler.listeners.ServiceListener;
 import org.opendaylight.transportpce.servicehandler.service.ServiceDataStoreOperations;
 import org.opendaylight.transportpce.test.AbstractTest;
 
@@ -34,6 +35,9 @@ public class ServicehandlerProviderTest  extends AbstractTest {
     @Mock
     PceListenerImpl pceListenerImpl;
 
+    @Mock
+    ServiceListener serviceListener;
+
     @Mock
     RendererListenerImpl rendererListenerImpl;
 
@@ -43,6 +47,7 @@ public class ServicehandlerProviderTest  extends AbstractTest {
     @Mock
     ServicehandlerImpl servicehandler;
 
+
     private AutoCloseable closeable;
 
     @Before
@@ -54,8 +59,8 @@ public class ServicehandlerProviderTest  extends AbstractTest {
     public void testInitRegisterServiceHandlerToRpcRegistry() {
         ServicehandlerProvider provider =  new ServicehandlerProvider(
                 getDataBroker(), rpcProviderRegistry,
-                getNotificationService() , serviceDataStoreOperations, pceListenerImpl, rendererListenerImpl,
-                networkModelListenerImpl, servicehandler);
+                getNotificationService() , serviceDataStoreOperations, pceListenerImpl, serviceListener,
+                rendererListenerImpl, networkModelListenerImpl, servicehandler);
 
         provider.init();