<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
</dependency>
- <dependency>
- <groupId>org.glassfish.jersey.media</groupId>
- <artifactId>jersey-media-sse</artifactId>
- </dependency>
<!-- Testing Dependencies -->
<dependency>
<artifactId>jersey-hk2</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-sse</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.opendaylight.mdsal</groupId>
<artifactId>mdsal-binding-test-utils</artifactId>
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriInfo;
-import org.glassfish.jersey.media.sse.EventOutput;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
public interface RestconfDataStreamService {
/**
*
* @param identifier path to target
* @param uriInfo URI info
- * @return {@link EventOutput}
*/
@GET
@Path("/{identifier:.+}")
@Produces(MediaType.SERVER_SENT_EVENTS)
- EventOutput getSSE(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo);
+ void getSSE(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo,
+ @Context SseEventSink sink, @Context Sse sse);
}
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.core.UriInfo;
-import org.glassfish.jersey.media.sse.EventOutput;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
}
@Override
- public EventOutput getSSE(final String identifier, final UriInfo uriInfo) {
+ public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) {
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
final BaseListenerInterface listener = listenersBroker.getListenerFor(streamName)
.orElseThrow(() -> {
});
LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
- final EventOutput eventOutput = new EventOutput();
- final SSESessionHandler handler = new SSESessionHandler(executorService, eventOutput, listener,
+
+ // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
+ // handler.init()/handler.close()
+ final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener,
maximumFragmentLength, heartbeatInterval);
handler.init();
- return eventOutput;
}
}
*
* @return Reusable instance of {@link ListenersBroker}.
*/
+ // FIXME: remove this global singleton
public static synchronized ListenersBroker getInstance() {
if (listenersBroker == null) {
listenersBroker = new ListenersBroker();
}
@SuppressWarnings({"checkstyle:IllegalCatch"})
- private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) {
+ private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
try {
requireNonNull(listener).close();
if (notificationListeners.inverse().remove(listener) == null) {
import com.google.common.base.CharMatcher;
import com.google.common.base.Strings;
-import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.glassfish.jersey.media.sse.EventOutput;
-import org.glassfish.jersey.media.sse.OutboundEvent;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
import org.slf4j.Logger;
private final BaseListenerInterface listener;
private final int maximumFragmentLength;
private final int heartbeatInterval;
- private final EventOutput output;
+ private final SseEventSink sink;
+ private final Sse sse;
+
private ScheduledFuture<?> pingProcess;
/**
* @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
* session up. Ping control frames are disabled if this parameter is set to 0.
*/
- public SSESessionHandler(final ScheduledExecutorService executorService, final EventOutput output,
+ public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) {
this.executorService = executorService;
- this.output = output;
+ this.sse = sse;
+ this.sink = sink;
this.listener = listener;
this.maximumFragmentLength = maximumFragmentLength;
this.heartbeatInterval = heartbeatInterval;
}
/**
- * Sending of string message to outbound Server-Sent Events channel
- * {@link org.glassfish.jersey.media.sse.EventOutput}. SSE is automatically split to fragments with new line
- * character. If the maximum fragment length is set to non-zero positive value and input message exceeds this
- * value, message is manually fragmented to multiple message fragments which are send individually. Previous
- * fragmentation is removed.
+ * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
+ * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
+ * message exceeds this value, message is manually fragmented to multiple message fragments which are send
+ * individually. Previous fragmentation is removed.
*
* @param message Message data to be send over web-socket session.
*/
// FIXME: should this be tolerated?
return;
}
- if (output.isClosed()) {
- close();
- return;
- }
- if (maximumFragmentLength != 0 && message.length() > maximumFragmentLength) {
- sendMessage(splitMessageToFragments(message));
+ if (!sink.isClosed()) {
+ final String toSend = maximumFragmentLength != 0 && message.length() > maximumFragmentLength
+ ? splitMessageToFragments(message) : message;
+ sink.send(sse.newEvent(toSend));
} else {
- sendMessage(message);
- }
- }
-
- private void sendMessage(final String message) {
- try {
- output.write(new OutboundEvent.Builder().data(String.class, message).build());
- } catch (IOException e) {
- LOG.warn("Connection from client {} is closed", this);
- LOG.debug("Connection from client is closed:", e);
- }
- }
-
- private void sendComment(final String message) {
- try {
- output.write(new OutboundEvent.Builder().comment(message).build());
- } catch (IOException e) {
- LOG.warn("Connection from client {} is closed", this);
- LOG.debug("Connection from client is closed:", e);
+ close();
}
}
}
private synchronized void sendPingMessage() {
- if (output.isClosed()) {
+ if (!sink.isClosed()) {
+ LOG.debug("sending PING:{}", PING_PAYLOAD);
+ sink.send(sse.newEventBuilder().comment(PING_PAYLOAD).build());
+ } else {
close();
- return;
}
- LOG.debug("sending PING:{}", PING_PAYLOAD);
- sendComment(PING_PAYLOAD);
}
private void stopPingProcess() {
@Override
public synchronized boolean isConnected() {
- return !output.isClosed();
+ return !sink.isClosed();
}
// TODO:return some type of identification of connection
@Override
public String toString() {
- return output.toString();
+ return sink.toString();
}
}
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.glassfish.jersey.media.sse.EventOutput;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.junit.Test;
import org.junit.runner.RunWith;
@Mock
private ScheduledFuture<?> pingFuture;
@Mock
- private EventOutput eventOutput;
+ private SseEventSink eventSink;
+ @Mock
+ private Sse sse;
private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
- final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventOutput, listener,
+ doCallRealMethod().when(sse).newEvent(any());
+ doAnswer(inv -> new OutboundEvent.Builder()).when(sse).newEventBuilder();
+
+ final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener,
maxFragmentSize, heartbeatInterval);
doReturn(pingFuture).when(executorService)
.scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
@Test
public void sendDataMessageWithDisabledFragmentation() throws IOException {
final SSESessionHandler sseSessionHandler = setup(0, 0);
- doReturn(false).when(eventOutput).isClosed();
+ doReturn(false).when(eventSink).isClosed();
sseSessionHandler.init();
final String testMessage = generateRandomStringOfLength(100);
sseSessionHandler.sendDataMessage(testMessage);
ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
- verify(eventOutput, times(1)).write(cap.capture());
+ verify(eventSink, times(1)).send(cap.capture());
OutboundEvent event = cap.getAllValues().get(0);
assertNotNull(event);
assertEquals(event.getData(), testMessage);
@Test
public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
final SSESessionHandler sseSessionHandler = setup(0, 0);
- doReturn(true).when(eventOutput).isClosed();
+ doReturn(true).when(eventSink).isClosed();
sseSessionHandler.init();
final String testMessage = generateRandomStringOfLength(11);
sseSessionHandler.sendDataMessage(testMessage);
ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
- verify(eventOutput, times(0)).write(cap.capture());
+ verify(eventSink, times(0)).send(cap.capture());
}
@Test
public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
final SSESessionHandler sseSessionHandler = setup(100, 0);
- doReturn(false).when(eventOutput).isClosed();
+ doReturn(false).when(eventSink).isClosed();
sseSessionHandler.init();
// in both cases, fragmentation should not be applied
ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
// without fragmentation there will be 2 write calls
- verify(eventOutput, times(2)).write(cap.capture());
+ verify(eventSink, times(2)).send(cap.capture());
OutboundEvent event1 = cap.getAllValues().get(0);
OutboundEvent event2 = cap.getAllValues().get(1);
assertNotNull(event1);
sseSessionHandler.init();
sseSessionHandler.sendDataMessage("");
- verifyNoMoreInteractions(eventOutput);
+ verifyNoMoreInteractions(eventSink);
}
@Test
public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
final SSESessionHandler sseSessionHandler = setup(100, 0);
- doReturn(false).when(eventOutput).isClosed();
+ doReturn(false).when(eventSink).isClosed();
sseSessionHandler.init();
// there should be 10 fragments of length 100 characters
ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
// SSE automatically send fragmented packet ended with new line character due to eventOutput
// have only 1 write call
- verify(eventOutput, times(1)).write(cap.capture());
+ verify(eventSink, times(1)).send(cap.capture());
OutboundEvent event = cap.getAllValues().get(0);
assertNotNull(event);
String[] lines = ((String) event.getData()).split("\r\n|\r|\n");