import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceDeserializer;
import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.get.notifications.service.output.NotificationService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Subscriber {
+public class Subscriber<T extends DataObject, D> {
private static final Logger LOG = LoggerFactory.getLogger(Subscriber.class);
- private final Consumer<String, NotificationService> consumer;
+ private final Consumer<String, D> consumer;
- public Subscriber(String id, String groupId, String suscriberServer,
- JsonStringConverter<org.opendaylight.yang.gen.v1
- .nbi.notifications.rev210628.NotificationService> deserializer) {
+ public Subscriber(String id, String groupId, String subscriberServer, JsonStringConverter<T> deserializer,
+ Class<?> deserializerConf) {
Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , NotificationServiceDeserializer.class);
+ propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf);
propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
- if (suscriberServer != null && !suscriberServer.isBlank()) {
- propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, suscriberServer);
+ if (subscriberServer != null && !subscriberServer.isBlank()) {
+ propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, subscriberServer);
}
- LOG.info("Suscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
+ LOG.info("Subscribing for group id {}, client config id {} with properties {}", groupId, id, propsConsumer);
consumer = new KafkaConsumer<>(propsConsumer);
}
- public List<NotificationService> subscribeService(String topicName) {
+ public List<D> subscribe(String topicName, @NonNull QName name) {
LOG.info("Subscribe request to topic '{}' ", topicName);
consumer.subscribe(Collections.singleton(topicName));
- final ConsumerRecords<String, NotificationService> consumerRecords = consumer.poll(Duration.ofMillis(1000));
- List<NotificationService> notificationServiceList = new ArrayList<>();
- YangInstanceIdentifier.of(NotificationService.QNAME);
- for (ConsumerRecord<String, NotificationService> record : consumerRecords) {
+ final ConsumerRecords<String, D> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+ List<D> notificationServiceList = new ArrayList<>();
+ YangInstanceIdentifier.of(name);
+ for (ConsumerRecord<String, D> record : consumerRecords) {
if (record.value() != null) {
notificationServiceList.add(record.value());
}
return notificationServiceList;
}
- @VisibleForTesting public Subscriber(Consumer<String, NotificationService> consumer) {
+ @VisibleForTesting public Subscriber(Consumer<String, D> consumer) {
this.consumer = consumer;
}
}