Implementation of T-API notification rpcs
[transportpce.git] / nbinotifications / src / main / java / org / opendaylight / transportpce / nbinotifications / consumer / Subscriber.java
index 7ebc0bdf620cb09a11bc9e3e31fe691298374f5a..154cbb80f4d191bd225a8d24c2bc11f9d52d1de2 100644 (file)
@@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.transportpce.common.converter.JsonStringConverter;
@@ -39,6 +40,7 @@ public class Subscriber<T extends DataObject, D> {
         Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties");
         propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id);
+        propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf);
         propsConsumer.put(ConfigConstants.CONVERTER , deserializer);
@@ -55,7 +57,7 @@ public class Subscriber<T extends DataObject, D> {
         final ConsumerRecords<String, D> consumerRecords = consumer.poll(Duration.ofMillis(1000));
         List<D> notificationServiceList = new ArrayList<>();
         YangInstanceIdentifier.of(name);
-        for (ConsumerRecord<String, D> record : consumerRecords) {
+        for (ConsumerRecord<String, D> record : consumerRecords.records(new TopicPartition(topicName, 0))) {
             if (record.value() != null) {
                 notificationServiceList.add(record.value());
             }