From c2baf51aa870d005561658057b0674d66177d813 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 17 May 2021 16:00:42 +0200 Subject: [PATCH] Switch to using SseEventSink JAX-RS 2.1 has a standard way of working with SSE, do not use Jersey-specific APIs (except in test). JIRA: NETCONF-775 Change-Id: I46c2b80be52d2313ab9193361da788f41923c175 Signed-off-by: Robert Varga --- restconf/restconf-nb-rfc8040/pom.xml | 9 +-- .../api/RestconfDataStreamService.java | 7 +- .../impl/RestconfDataStreamServiceImpl.java | 12 ++-- .../streams/listeners/ListenersBroker.java | 3 +- .../streams/sse/SSESessionHandler.java | 65 +++++++------------ .../streams/sse/SSESessionHandlerTest.java | 32 +++++---- 6 files changed, 61 insertions(+), 67 deletions(-) diff --git a/restconf/restconf-nb-rfc8040/pom.xml b/restconf/restconf-nb-rfc8040/pom.xml index 353773ae52..45585124bf 100644 --- a/restconf/restconf-nb-rfc8040/pom.xml +++ b/restconf/restconf-nb-rfc8040/pom.xml @@ -152,10 +152,6 @@ org.eclipse.jetty.websocket websocket-server - - org.glassfish.jersey.media - jersey-media-sse - @@ -168,6 +164,11 @@ jersey-hk2 test + + org.glassfish.jersey.media + jersey-media-sse + test + org.opendaylight.mdsal mdsal-binding-test-utils diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java index 0891f6e238..b9f58fb9c7 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java @@ -15,7 +15,8 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.UriInfo; -import org.glassfish.jersey.media.sse.EventOutput; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; public interface RestconfDataStreamService { /** @@ -23,10 +24,10 @@ public interface RestconfDataStreamService { * * @param identifier path to target * @param uriInfo URI info - * @return {@link EventOutput} */ @GET @Path("/{identifier:.+}") @Produces(MediaType.SERVER_SENT_EVENTS) - EventOutput getSSE(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo); + void getSSE(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo, + @Context SseEventSink sink, @Context Sse sse); } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java index 6befc25763..3d0e4b5c7a 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java @@ -11,7 +11,8 @@ import java.util.concurrent.ScheduledExecutorService; import javax.inject.Inject; import javax.inject.Singleton; import javax.ws.rs.core.UriInfo; -import org.glassfish.jersey.media.sse.EventOutput; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag; @@ -45,7 +46,7 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService } @Override - public EventOutput getSSE(final String identifier, final UriInfo uriInfo) { + public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) { final String streamName = ListenersBroker.createStreamNameFromUri(identifier); final BaseListenerInterface listener = listenersBroker.getListenerFor(streamName) .orElseThrow(() -> { @@ -54,10 +55,11 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService }); LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName); - final EventOutput eventOutput = new EventOutput(); - final SSESessionHandler handler = new SSESessionHandler(executorService, eventOutput, listener, + + // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call + // handler.init()/handler.close() + final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener, maximumFragmentLength, heartbeatInterval); handler.init(); - return eventOutput; } } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java index d1dadcf22e..b27502ac28 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java @@ -49,6 +49,7 @@ public final class ListenersBroker { * * @return Reusable instance of {@link ListenersBroker}. */ + // FIXME: remove this global singleton public static synchronized ListenersBroker getInstance() { if (listenersBroker == null) { listenersBroker = new ListenersBroker(); @@ -303,7 +304,7 @@ public final class ListenersBroker { } @SuppressWarnings({"checkstyle:IllegalCatch"}) - private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) { + private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) { try { requireNonNull(listener).close(); if (notificationListeners.inverse().remove(listener) == null) { diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java index 4d327b4c54..b65ff2bea1 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java @@ -9,12 +9,11 @@ package org.opendaylight.restconf.nb.rfc8040.streams.sse; import com.google.common.base.CharMatcher; import com.google.common.base.Strings; -import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.glassfish.jersey.media.sse.EventOutput; -import org.glassfish.jersey.media.sse.OutboundEvent; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface; import org.slf4j.Logger; @@ -34,7 +33,9 @@ public class SSESessionHandler implements StreamSessionHandler { private final BaseListenerInterface listener; private final int maximumFragmentLength; private final int heartbeatInterval; - private final EventOutput output; + private final SseEventSink sink; + private final Sse sse; + private ScheduledFuture pingProcess; /** @@ -53,10 +54,11 @@ public class SSESessionHandler implements StreamSessionHandler { * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep * session up. Ping control frames are disabled if this parameter is set to 0. */ - public SSESessionHandler(final ScheduledExecutorService executorService, final EventOutput output, + public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse, final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) { this.executorService = executorService; - this.output = output; + this.sse = sse; + this.sink = sink; this.listener = listener; this.maximumFragmentLength = maximumFragmentLength; this.heartbeatInterval = heartbeatInterval; @@ -83,11 +85,10 @@ public class SSESessionHandler implements StreamSessionHandler { } /** - * Sending of string message to outbound Server-Sent Events channel - * {@link org.glassfish.jersey.media.sse.EventOutput}. SSE is automatically split to fragments with new line - * character. If the maximum fragment length is set to non-zero positive value and input message exceeds this - * value, message is manually fragmented to multiple message fragments which are send individually. Previous - * fragmentation is removed. + * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split + * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input + * message exceeds this value, message is manually fragmented to multiple message fragments which are send + * individually. Previous fragmentation is removed. * * @param message Message data to be send over web-socket session. */ @@ -97,32 +98,12 @@ public class SSESessionHandler implements StreamSessionHandler { // FIXME: should this be tolerated? return; } - if (output.isClosed()) { - close(); - return; - } - if (maximumFragmentLength != 0 && message.length() > maximumFragmentLength) { - sendMessage(splitMessageToFragments(message)); + if (!sink.isClosed()) { + final String toSend = maximumFragmentLength != 0 && message.length() > maximumFragmentLength + ? splitMessageToFragments(message) : message; + sink.send(sse.newEvent(toSend)); } else { - sendMessage(message); - } - } - - private void sendMessage(final String message) { - try { - output.write(new OutboundEvent.Builder().data(String.class, message).build()); - } catch (IOException e) { - LOG.warn("Connection from client {} is closed", this); - LOG.debug("Connection from client is closed:", e); - } - } - - private void sendComment(final String message) { - try { - output.write(new OutboundEvent.Builder().comment(message).build()); - } catch (IOException e) { - LOG.warn("Connection from client {} is closed", this); - LOG.debug("Connection from client is closed:", e); + close(); } } @@ -144,12 +125,12 @@ public class SSESessionHandler implements StreamSessionHandler { } private synchronized void sendPingMessage() { - if (output.isClosed()) { + if (!sink.isClosed()) { + LOG.debug("sending PING:{}", PING_PAYLOAD); + sink.send(sse.newEventBuilder().comment(PING_PAYLOAD).build()); + } else { close(); - return; } - LOG.debug("sending PING:{}", PING_PAYLOAD); - sendComment(PING_PAYLOAD); } private void stopPingProcess() { @@ -160,12 +141,12 @@ public class SSESessionHandler implements StreamSessionHandler { @Override public synchronized boolean isConnected() { - return !output.isClosed(); + return !sink.isClosed(); } // TODO:return some type of identification of connection @Override public String toString() { - return output.toString(); + return sink.toString(); } } diff --git a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java index f470e1ba4c..5916a607c5 100644 --- a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java +++ b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java @@ -12,6 +12,8 @@ import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -22,7 +24,8 @@ import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.glassfish.jersey.media.sse.EventOutput; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; import org.glassfish.jersey.media.sse.OutboundEvent; import org.junit.Test; import org.junit.runner.RunWith; @@ -40,10 +43,15 @@ public class SSESessionHandlerTest { @Mock private ScheduledFuture pingFuture; @Mock - private EventOutput eventOutput; + private SseEventSink eventSink; + @Mock + private Sse sse; private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) { - final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventOutput, listener, + doCallRealMethod().when(sse).newEvent(any()); + doAnswer(inv -> new OutboundEvent.Builder()).when(sse).newEventBuilder(); + + final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener, maxFragmentSize, heartbeatInterval); doReturn(pingFuture).when(executorService) .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval), @@ -119,13 +127,13 @@ public class SSESessionHandlerTest { @Test public void sendDataMessageWithDisabledFragmentation() throws IOException { final SSESessionHandler sseSessionHandler = setup(0, 0); - doReturn(false).when(eventOutput).isClosed(); + doReturn(false).when(eventSink).isClosed(); sseSessionHandler.init(); final String testMessage = generateRandomStringOfLength(100); sseSessionHandler.sendDataMessage(testMessage); ArgumentCaptor cap = ArgumentCaptor.forClass(OutboundEvent.class); - verify(eventOutput, times(1)).write(cap.capture()); + verify(eventSink, times(1)).send(cap.capture()); OutboundEvent event = cap.getAllValues().get(0); assertNotNull(event); assertEquals(event.getData(), testMessage); @@ -134,19 +142,19 @@ public class SSESessionHandlerTest { @Test public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException { final SSESessionHandler sseSessionHandler = setup(0, 0); - doReturn(true).when(eventOutput).isClosed(); + doReturn(true).when(eventSink).isClosed(); sseSessionHandler.init(); final String testMessage = generateRandomStringOfLength(11); sseSessionHandler.sendDataMessage(testMessage); ArgumentCaptor cap = ArgumentCaptor.forClass(OutboundEvent.class); - verify(eventOutput, times(0)).write(cap.capture()); + verify(eventSink, times(0)).send(cap.capture()); } @Test public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException { final SSESessionHandler sseSessionHandler = setup(100, 0); - doReturn(false).when(eventOutput).isClosed(); + doReturn(false).when(eventSink).isClosed(); sseSessionHandler.init(); // in both cases, fragmentation should not be applied @@ -157,7 +165,7 @@ public class SSESessionHandlerTest { ArgumentCaptor cap = ArgumentCaptor.forClass(OutboundEvent.class); // without fragmentation there will be 2 write calls - verify(eventOutput, times(2)).write(cap.capture()); + verify(eventSink, times(2)).send(cap.capture()); OutboundEvent event1 = cap.getAllValues().get(0); OutboundEvent event2 = cap.getAllValues().get(1); assertNotNull(event1); @@ -176,13 +184,13 @@ public class SSESessionHandlerTest { sseSessionHandler.init(); sseSessionHandler.sendDataMessage(""); - verifyNoMoreInteractions(eventOutput); + verifyNoMoreInteractions(eventSink); } @Test public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException { final SSESessionHandler sseSessionHandler = setup(100, 0); - doReturn(false).when(eventOutput).isClosed(); + doReturn(false).when(eventSink).isClosed(); sseSessionHandler.init(); // there should be 10 fragments of length 100 characters @@ -191,7 +199,7 @@ public class SSESessionHandlerTest { ArgumentCaptor cap = ArgumentCaptor.forClass(OutboundEvent.class); // SSE automatically send fragmented packet ended with new line character due to eventOutput // have only 1 write call - verify(eventOutput, times(1)).write(cap.capture()); + verify(eventSink, times(1)).send(cap.capture()); OutboundEvent event = cap.getAllValues().get(0); assertNotNull(event); String[] lines = ((String) event.getData()).split("\r\n|\r|\n"); -- 2.36.6