2 * Copyright © 2020 Orange, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.transportpce.nbinotifications.consumer;
11 import static org.junit.jupiter.api.Assertions.assertEquals;
12 import static org.junit.jupiter.api.Assertions.assertTrue;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.List;
18 import org.apache.kafka.clients.consumer.ConsumerRecord;
19 import org.apache.kafka.clients.consumer.MockConsumer;
20 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
21 import org.apache.kafka.common.TopicPartition;
22 import org.junit.jupiter.api.BeforeEach;
23 import org.junit.jupiter.api.Test;
24 import org.opendaylight.transportpce.nbinotifications.utils.NotificationServiceDataUtils;
25 import org.opendaylight.transportpce.test.AbstractTest;
26 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationAlarmService;
27 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationProcessService;
28 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.NotificationTapiService;
29 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.get.notifications.alarm.service.output.NotificationsAlarmService;
30 import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.get.notifications.process.service.output.NotificationsProcessService;
31 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.get.notification.list.output.Notification;
33 public class SubscriberTest extends AbstractTest {
34 private static final String TOPIC = "topic";
35 private static final int PARTITION = 0;
36 private MockConsumer<String, NotificationsProcessService> mockConsumer;
37 private MockConsumer<String, NotificationsAlarmService> mockConsumerAlarm;
38 private MockConsumer<String, Notification> mockConsumerTapi;
39 private Subscriber<NotificationProcessService, NotificationsProcessService> subscriberService;
40 private Subscriber<NotificationAlarmService, NotificationsAlarmService> subscriberAlarmService;
41 private Subscriber<NotificationTapiService, Notification> subscriberTapiService;
45 mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
46 mockConsumerAlarm = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
47 mockConsumerTapi = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
48 subscriberService = new Subscriber<>(mockConsumer);
49 subscriberAlarmService = new Subscriber<>(mockConsumerAlarm);
50 subscriberTapiService = new Subscriber<>(mockConsumerTapi);
54 void subscribeServiceShouldBeSuccessful() {
55 // from https://www.baeldung.com/kafka-mockconsumer
56 ConsumerRecord<String, NotificationsProcessService> record = new ConsumerRecord<>(
57 TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedEvent());
58 mockConsumer.schedulePollTask(() -> {
59 mockConsumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
60 mockConsumer.addRecord(record);
63 Map<TopicPartition, Long> startOffsets = new HashMap<>();
64 TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
65 startOffsets.put(tp, 0L);
66 mockConsumer.updateBeginningOffsets(startOffsets);
67 List<NotificationsProcessService> result = subscriberService.subscribe(TOPIC,
68 NotificationsProcessService.QNAME);
69 assertEquals(1, result.size(), "There should be 1 record");
70 assertTrue(mockConsumer.closed(), "Consumer should be closed");
74 void subscribeAlarmShouldBeSuccessful() {
75 // from https://www.baeldung.com/kafka-mockconsumer
76 ConsumerRecord<String, NotificationsAlarmService> record = new ConsumerRecord<>(
77 TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedAlarmEvent());
78 mockConsumerAlarm.schedulePollTask(() -> {
79 mockConsumerAlarm.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
80 mockConsumerAlarm.addRecord(record);
83 Map<TopicPartition, Long> startOffsets = new HashMap<>();
84 TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
85 startOffsets.put(tp, 0L);
86 mockConsumerAlarm.updateBeginningOffsets(startOffsets);
87 List<NotificationsAlarmService> result = subscriberAlarmService.subscribe(TOPIC,
88 NotificationsAlarmService.QNAME);
89 assertEquals(1, result.size(), "There should be 1 record");
90 assertTrue(mockConsumerAlarm.closed(), "Consumer should be closed");
94 void subscribeTapiAlarmShouldBeSuccessful() {
95 // from https://www.baeldung.com/kafka-mockconsumer
96 ConsumerRecord<String, Notification> record = new ConsumerRecord<>(
97 TOPIC, PARTITION, 0L, "key", NotificationServiceDataUtils.buildReceivedTapiAlarmEvent());
98 mockConsumerTapi.schedulePollTask(() -> {
99 mockConsumerTapi.rebalance(Collections.singletonList(new TopicPartition(TOPIC, PARTITION)));
100 mockConsumerTapi.addRecord(record);
103 Map<TopicPartition, Long> startOffsets = new HashMap<>();
104 TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
105 startOffsets.put(tp, 0L);
106 mockConsumerTapi.updateBeginningOffsets(startOffsets);
107 List<Notification> result = subscriberTapiService.subscribe(TOPIC,
108 NotificationTapiService.QNAME);
109 assertEquals(1, result.size(), "There should be 1 record");
110 assertTrue(mockConsumerTapi.closed(), "Consumer should be closed");