Add service listener to notify Kafka
[transportpce.git] / nbinotifications / src / main / java / org / opendaylight / transportpce / nbinotifications / impl / NbiNotificationsProvider.java
index 5f41034465a03cd25f17eae8238adf991cdc114a..3278416536026cfa21170a572918f6a7c432ad87 100644 (file)
@@ -16,8 +16,9 @@ import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecServices;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.listener.NbiNotificationsListenerImpl;
 import org.opendaylight.transportpce.nbinotifications.producer.Publisher;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsListener;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev201130.NbiNotificationsService;
+import org.opendaylight.transportpce.nbinotifications.producer.PublisherAlarm;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NbiNotificationsService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.ObjectRegistration;
 import org.slf4j.Logger;
@@ -26,29 +27,37 @@ import org.slf4j.LoggerFactory;
 public class NbiNotificationsProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(NbiNotificationsProvider.class);
-    private static Map<String, Publisher> publishersMap =  new HashMap<>();
+    private static Map<String, Publisher> publishersServiceMap =  new HashMap<>();
+    private static Map<String, PublisherAlarm> publishersAlarmMap =  new HashMap<>();
 
     private final RpcProviderService rpcService;
     private ObjectRegistration<NbiNotificationsService> rpcRegistration;
     private ListenerRegistration<NbiNotificationsListener> listenerRegistration;
     private NotificationService notificationService;
     private final JsonStringConverter<org.opendaylight.yang.gen.v1
-        .nbi.notifications.rev201130.NotificationService> converter;
-    private final String suscriberServer;
+        .nbi.notifications.rev210628.NotificationService> converterService;
+    private final JsonStringConverter<org.opendaylight.yang.gen.v1
+            .nbi.notifications.rev210628.NotificationAlarmService> converterAlarmService;
+    private final String subscriberServer;
 
 
-    public NbiNotificationsProvider(List<String> topics,
-            String suscriberServer, String publisherServer,
+    public NbiNotificationsProvider(List<String> topicsService, List<String> topicsAlarm,
+            String subscriberServer, String publisherServer,
             RpcProviderService rpcProviderService, NotificationService notificationService,
             BindingDOMCodecServices bindingDOMCodecServices) {
         this.rpcService = rpcProviderService;
         this.notificationService = notificationService;
-        converter =  new JsonStringConverter<>(bindingDOMCodecServices);
-        for (String topic: topics) {
+        converterService =  new JsonStringConverter<>(bindingDOMCodecServices);
+        for (String topic: topicsService) {
+            LOG.info("Creating publisher for topic {}", topic);
+            publishersServiceMap.put(topic, new Publisher(topic, publisherServer, converterService));
+        }
+        converterAlarmService = new JsonStringConverter<>(bindingDOMCodecServices);
+        for (String topic: topicsAlarm) {
             LOG.info("Creating publisher for topic {}", topic);
-            publishersMap.put(topic, new Publisher(topic, publisherServer, converter));
+            publishersAlarmMap.put(topic, new PublisherAlarm(topic, publisherServer, converterAlarmService));
         }
-        this.suscriberServer = suscriberServer;
+        this.subscriberServer = subscriberServer;
     }
 
     /**
@@ -57,18 +66,21 @@ public class NbiNotificationsProvider {
     public void init() {
         LOG.info("NbiNotificationsProvider Session Initiated");
         rpcRegistration = rpcService.registerRpcImplementation(NbiNotificationsService.class,
-                new NbiNotificationsImpl(converter, suscriberServer));
+                new NbiNotificationsImpl(converterService, converterAlarmService, subscriberServer));
         listenerRegistration = notificationService.registerNotificationListener(
-                new NbiNotificationsListenerImpl(publishersMap));
+                new NbiNotificationsListenerImpl(publishersServiceMap, publishersAlarmMap));
     }
 
     /**
      * Method called when the blueprint container is destroyed.
      */
     public void close() {
-        for (Publisher publisher : publishersMap.values()) {
+        for (Publisher publisher : publishersServiceMap.values()) {
             publisher.close();
         }
+        for (PublisherAlarm publisherAlarm : publishersAlarmMap.values()) {
+            publisherAlarm.close();
+        }
         rpcRegistration.close();
         listenerRegistration.close();
         LOG.info("NbiNotificationsProvider Closed");