import org.apache.kafka.common.serialization.StringSerializer;
import org.opendaylight.transportpce.common.converter.JsonStringConverter;
import org.opendaylight.transportpce.nbinotifications.serialization.ConfigConstants;
-import org.opendaylight.transportpce.nbinotifications.serialization.NotificationServiceSerializer;
import org.opendaylight.transportpce.nbinotifications.utils.NbiNotificationsUtils;
-import org.opendaylight.yang.gen.v1.nbi.notifications.rev210628.NotificationService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Publisher {
+public class Publisher<T extends DataObject> {
private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
private final String id;
- private final Producer<String, NotificationService> producer;
+ private final Producer<String, T> producer;
- public Publisher(String id, String publisherServer, JsonStringConverter<NotificationService> serializer) {
+ public Publisher(String id, String publisherServer, JsonStringConverter<T> serializer, Class<?> serializerConf) {
Properties properties = NbiNotificationsUtils.loadProperties("publisher.properties");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, id);
if (publisherServer != null && !publisherServer.isBlank()) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherServer);
}
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , NotificationServiceSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , serializerConf);
properties.put(ConfigConstants.CONVERTER , serializer);
- LOG.info("Creationg publisher for id {} with properties {}", id, properties);
+ LOG.info("Creation publisher for id {} with properties {}", id, properties);
producer = new KafkaProducer<>(properties);
this.id = id;
}
- @VisibleForTesting Publisher(String id, Producer<String, NotificationService> producer) {
+ @VisibleForTesting Publisher(String id, Producer<String, T> producer) {
this.producer = producer;
this.id = id;
}
producer.close();
}
- public void sendEvent(NotificationService notificationService) {
- LOG.info("SendEvent request to topic '{}' ", notificationService.getConnectionType().getName());
- producer.send(new ProducerRecord<>(notificationService.getConnectionType().getName(), id, notificationService));
+ public void sendEvent(T notification, String topic) {
+ LOG.info("SendEvent request to topic '{}' ", topic);
+ producer.send(new ProducerRecord<>(topic, id, notification));
producer.flush();
}
}