import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
+ propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf);
propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
final ConsumerRecords<String, D> consumerRecords = consumer.poll(Duration.ofMillis(1000));
List<D> notificationServiceList = new ArrayList<>();
YangInstanceIdentifier.of(name);
- for (ConsumerRecord<String, D> record : consumerRecords) {
+ for (ConsumerRecord<String, D> record : consumerRecords.records(new TopicPartition(topicName, 0))) {
if (record.value() != null) {
notificationServiceList.add(record.value());
}