import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
-import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream;
+import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
import org.opendaylight.yangtools.concepts.Registration;
@ExtendWith(MockitoExtension.class)
class SSESessionHandlerTest {
@Mock
- private ScheduledExecutorService executorService;
+ private PingExecutor pingExecutor;
@Mock
private RestconfStream<?> stream;
@Mock
- private ScheduledFuture<?> pingFuture;
+ private Registration pingRegistration;
@Mock
private SseEventSink eventSink;
@Mock
@Mock
private Registration reg;
- private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) throws Exception {
- final var sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, stream,
+ private SSESender setup(final int maxFragmentSize, final long heartbeatInterval) throws Exception {
+ final var sseSessionHandler = new SSESender(pingExecutor, eventSink, sse, stream,
EncodingName.RFC8040_XML, new ReceiveEventsParams(null, null, null, null, null, null, null),
maxFragmentSize, heartbeatInterval);
doReturn(reg).when(stream).addSubscriber(eq(sseSessionHandler), any(), any());
}
private void setupPing(final long maxFragmentSize, final long heartbeatInterval) {
- doReturn(pingFuture).when(executorService)
- .scheduleWithFixedDelay(any(Runnable.class), eq(heartbeatInterval), eq(heartbeatInterval),
- eq(TimeUnit.MILLISECONDS));
+ doReturn(pingRegistration).when(pingExecutor)
+ .startPingProcess(any(Runnable.class), eq(heartbeatInterval), eq(TimeUnit.MILLISECONDS));
}
@Test
void onSSEConnectedWithEnabledPing() throws Exception {
- final int heartbeatInterval = 1000;
+ final var heartbeatInterval = 1000L;
final var sseSessionHandler = setup(1000, heartbeatInterval);
sseSessionHandler.init();
- verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
- eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
+ verify(pingExecutor).startPingProcess(any(Runnable.class), eq(heartbeatInterval), eq(TimeUnit.MILLISECONDS));
}
@Test
final var sseSessionHandler = setup(1000, heartbeatInterval);
sseSessionHandler.init();
- verifyNoMoreInteractions(executorService);
+ verifyNoMoreInteractions(pingExecutor);
}
@Test
final var sseSessionHandler = setup(150, 8000);
setupPing(150, 8000);
sseSessionHandler.init();
- doReturn(false).when(pingFuture).isCancelled();
- doReturn(false).when(pingFuture).isDone();
+ doNothing().when(pingRegistration).close();
sseSessionHandler.close();
verify(reg).close();
- verify(pingFuture).cancel(anyBoolean());
}
@Test
setupPing(150, 8000);
sseSessionHandler.init();
+ doNothing().when(pingRegistration).close();
sseSessionHandler.close();
verify(reg).close();
- verify(pingFuture).cancel(anyBoolean());
}
@Test
sseSessionHandler.close();
verify(reg).close();
- verify(pingFuture, never()).cancel(anyBoolean());
+ verifyNoMoreInteractions(pingRegistration);
}
@Test