X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=transportpce.git;a=blobdiff_plain;f=nbinotifications%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Ftransportpce%2Fnbinotifications%2Fconsumer%2FSubscriber.java;h=154cbb80f4d191bd225a8d24c2bc11f9d52d1de2;hp=7ebc0bdf620cb09a11bc9e3e31fe691298374f5a;hb=5b61d96f9251589244940215ee9f62e7ff204d11;hpb=4629e64c30400e7d4ee57d046ef4a4f9cb64c31a diff --git a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java index 7ebc0bdf6..154cbb80f 100644 --- a/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java +++ b/nbinotifications/src/main/java/org/opendaylight/transportpce/nbinotifications/consumer/Subscriber.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.transportpce.common.converter.JsonStringConverter; @@ -39,6 +40,7 @@ public class Subscriber { Properties propsConsumer = NbiNotificationsUtils.loadProperties("subscriber.properties"); propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsConsumer.put(ConsumerConfig.CLIENT_ID_CONFIG, id); + propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , deserializerConf); propsConsumer.put(ConfigConstants.CONVERTER , deserializer); @@ -55,7 +57,7 @@ public class Subscriber { final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); List notificationServiceList = new ArrayList<>(); YangInstanceIdentifier.of(name); - for (ConsumerRecord record : consumerRecords) { + for (ConsumerRecord record : consumerRecords.records(new TopicPartition(topicName, 0))) { if (record.value() != null) { notificationServiceList.add(record.value()); }