Refactor NBINotification & add ServiceListener tests
[transportpce.git] / nbinotifications / src / main / java / org / opendaylight / transportpce / nbinotifications / consumer / Subscriber.java
index 735c6a8d25a2de0edb64b3790716300a72f2d7a3..7ebc0bdf620cb09a11bc9e3e31fe691298374f5a 100644 (file)
@@ -19,43 +19,43 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
 import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
 import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Subscriber {
+public class Subscriber<T extends DataObject, D> {
     private static final Logger LOG = LoggerFactory.getLogger(Subscriber.class);
 
-    private final Consumer<String, NotificationService> consumer;
+    private final Consumer<String, D> consumer;
 
-    public Subscriber(String id, String groupId, String suscriberServer,
-            JsonStringConverter<org.opendaylight.yang.gen.v1
-                .nbi.notifications.rev210628.NotificationService> deserializer) {
+    public Subscriber(String id, String groupId, String subscriberServer, JsonStringConverter<T> deserializer,
+                      Class<?> deserializerConf) {
         Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
         propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
         propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationServiceDeserializer.class);
+        propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf);
         propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
-        if (suscriberServer != null && !suscriberServer.isBlank()) {
-            propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, suscriberServer);
+        if (subscriberServer != null && !subscriberServer.isBlank()) {
+            propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer);
         }
-        LOG.info("Suscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
+        LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
         consumer = new KafkaConsumer<>(propsConsumer);
     }
 
-    public List<NotificationService> subscribeService(String topicName) {
+    public List<D> subscribe(String topicName, @NonNull QName name) {
         LOG.info("Subscribe request to topic '{}' ", topicName);
         consumer.subscribe(Collections.singleton(topicName));
-        final ConsumerRecords<String, NotificationService> consumerRecords = consumer.poll(Duration.ofMillis(1000));
-        List<NotificationService> notificationServiceList = new ArrayList<>();
-        YangInstanceIdentifier.of(NotificationService.QNAME);
-        for (ConsumerRecord<String, NotificationService> record : consumerRecords) {
+        final ConsumerRecords<String, D> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+        List<D> notificationServiceList = new ArrayList<>();
+        YangInstanceIdentifier.of(name);
+        for (ConsumerRecord<String, D> record : consumerRecords) {
             if (record.value() != null) {
                 notificationServiceList.add(record.value());
             }
@@ -66,7 +66,7 @@ public class Subscriber {
         return notificationServiceList;
     }
 
-    @VisibleForTesting public Subscriber(Consumer<String, NotificationService> consumer) {
+    @VisibleForTesting public Subscriber(Consumer<String, D> consumer) {
         this.consumer = consumer;
     }
 }