From: Robert Varga Date: Mon, 17 May 2021 14:00:42 +0000 (+0200) Subject: Switch to using SseEventSink X-Git-Tag: v1.13.3~8 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=fed3396f993be8175e737477bb4c5cde507fddf9;p=netconf.git Switch to using SseEventSink JAX-RS 2.1 has a standard way of working with SSE, do not use Jersey-specific APIs (except in test). JIRA: NETCONF-775 Change-Id: I46c2b80be52d2313ab9193361da788f41923c175 Signed-off-by: Robert Varga (cherry picked from commit c2baf51aa870d005561658057b0674d66177d813) --- diff --git a/restconf/restconf-nb-rfc8040/pom.xml b/restconf/restconf-nb-rfc8040/pom.xml index a3ed4e42bb..4f8ca13d88 100644 --- a/restconf/restconf-nb-rfc8040/pom.xml +++ b/restconf/restconf-nb-rfc8040/pom.xml @@ -145,10 +145,6 @@ org.eclipse.jetty.websocket websocket-server - - org.glassfish.jersey.media - jersey-media-sse - @@ -161,6 +157,11 @@ jersey-hk2 test + + org.glassfish.jersey.media + jersey-media-sse + test + org.opendaylight.mdsal mdsal-binding-test-utils diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java index 4a6d6fe30c..b9f58fb9c7 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java @@ -15,21 +15,19 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.UriInfo; -import org.glassfish.jersey.media.sse.EventOutput; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; public interface RestconfDataStreamService { /** * Get target data resource. * - * @param identifier - * path to target - * @param uriInfo - * URI info - * @return {@link EventOutput} + * @param identifier path to target + * @param uriInfo URI info */ @GET @Path("/{identifier:.+}") @Produces(MediaType.SERVER_SENT_EVENTS) - EventOutput getSSE(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo); - + void getSSE(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo, + @Context SseEventSink sink, @Context Sse sse); } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java index 6b06b5278a..b716da3ec2 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java @@ -12,7 +12,8 @@ import java.util.concurrent.ScheduledExecutorService; import javax.inject.Inject; import javax.inject.Singleton; import javax.ws.rs.core.UriInfo; -import org.glassfish.jersey.media.sse.EventOutput; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag; import org.opendaylight.restconf.common.errors.RestconfError.ErrorType; @@ -44,7 +45,7 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService } @Override - public EventOutput getSSE(final String identifier, final UriInfo uriInfo) { + public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) { final String streamName = ListenersBroker.createStreamNameFromUri(identifier); final Optional listener = listenersBroker.getListenerFor(streamName); @@ -54,10 +55,11 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService } LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName); - final EventOutput eventOutput = new EventOutput(); - final SSESessionHandler handler = new SSESessionHandler(executorService, eventOutput, listener.get(), + + // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call + // handler.init()/handler.close() + final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener.get(), maximumFragmentLength, heartbeatInterval); handler.init(); - return eventOutput; } } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/services/wrapper/ServicesNotifWrapper.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/services/wrapper/ServicesNotifWrapper.java index 7d6b76715d..2f34564472 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/services/wrapper/ServicesNotifWrapper.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/services/wrapper/ServicesNotifWrapper.java @@ -9,7 +9,8 @@ package org.opendaylight.restconf.nb.rfc8040.services.wrapper; import javax.ws.rs.Path; import javax.ws.rs.core.UriInfo; -import org.glassfish.jersey.media.sse.EventOutput; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataStreamService; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl; import org.opendaylight.restconf.nb.rfc8040.streams.sse.SSEInitializer; @@ -25,17 +26,17 @@ public final class ServicesNotifWrapper implements RestconfDataStreamService { private final RestconfDataStreamService delegRestStream; - private ServicesNotifWrapper(RestconfDataStreamService delegRestStream) { + private ServicesNotifWrapper(final RestconfDataStreamService delegRestStream) { this.delegRestStream = delegRestStream; } - public static ServicesNotifWrapper newInstance(SSEInitializer configuration) { + public static ServicesNotifWrapper newInstance(final SSEInitializer configuration) { RestconfDataStreamService delegRestStream = new RestconfDataStreamServiceImpl(configuration); return new ServicesNotifWrapper(delegRestStream); } @Override - public EventOutput getSSE(String identifier, UriInfo uriInfo) { - return this.delegRestStream.getSSE(identifier, uriInfo); + public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) { + this.delegRestStream.getSSE(identifier, uriInfo, sink, sse); } } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java index d1dadcf22e..b27502ac28 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java @@ -49,6 +49,7 @@ public final class ListenersBroker { * * @return Reusable instance of {@link ListenersBroker}. */ + // FIXME: remove this global singleton public static synchronized ListenersBroker getInstance() { if (listenersBroker == null) { listenersBroker = new ListenersBroker(); @@ -303,7 +304,7 @@ public final class ListenersBroker { } @SuppressWarnings({"checkstyle:IllegalCatch"}) - private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) { + private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) { try { requireNonNull(listener).close(); if (notificationListeners.inverse().remove(listener) == null) { diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java index da1ee72e4c..26519048b8 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java @@ -9,12 +9,11 @@ package org.opendaylight.restconf.nb.rfc8040.streams.sse; import com.google.common.base.CharMatcher; import com.google.common.base.Strings; -import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.glassfish.jersey.media.sse.EventOutput; -import org.glassfish.jersey.media.sse.OutboundEvent; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface; import org.slf4j.Logger; @@ -34,7 +33,9 @@ public class SSESessionHandler implements SessionHandlerInterface { private final BaseListenerInterface listener; private final int maximumFragmentLength; private final int heartbeatInterval; - private final EventOutput output; + private final SseEventSink sink; + private final Sse sse; + private ScheduledFuture pingProcess; /** @@ -53,10 +54,11 @@ public class SSESessionHandler implements SessionHandlerInterface { * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep * session up. Ping control frames are disabled if this parameter is set to 0. */ - public SSESessionHandler(final ScheduledExecutorService executorService, final EventOutput output, + public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse, final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) { this.executorService = executorService; - this.output = output; + this.sse = sse; + this.sink = sink; this.listener = listener; this.maximumFragmentLength = maximumFragmentLength; this.heartbeatInterval = heartbeatInterval; @@ -83,11 +85,10 @@ public class SSESessionHandler implements SessionHandlerInterface { } /** - * Sending of string message to outbound Server-Sent Events channel - * {@link org.glassfish.jersey.media.sse.EventOutput}. SSE is automatically split to fragments with new line - * character. If the maximum fragment length is set to non-zero positive value and input message exceeds this - * value, message is manually fragmented to multiple message fragments which are send individually. Previous - * fragmentation is removed. + * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split + * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input + * message exceeds this value, message is manually fragmented to multiple message fragments which are send + * individually. Previous fragmentation is removed. * * @param message Message data to be send over web-socket session. */ @@ -97,32 +98,12 @@ public class SSESessionHandler implements SessionHandlerInterface { // FIXME: should this be tolerated? return; } - if (output.isClosed()) { - close(); - return; - } - if (maximumFragmentLength != 0 && message.length() > maximumFragmentLength) { - sendMessage(splitMessageToFragments(message)); + if (!sink.isClosed()) { + final String toSend = maximumFragmentLength != 0 && message.length() > maximumFragmentLength + ? splitMessageToFragments(message) : message; + sink.send(sse.newEvent(toSend)); } else { - sendMessage(message); - } - } - - private void sendMessage(final String message) { - try { - output.write(new OutboundEvent.Builder().data(String.class, message).build()); - } catch (IOException e) { - LOG.warn("Connection from client {} is closed", this); - LOG.debug("Connection from client is closed:", e); - } - } - - private void sendComment(final String message) { - try { - output.write(new OutboundEvent.Builder().comment(message).build()); - } catch (IOException e) { - LOG.warn("Connection from client {} is closed", this); - LOG.debug("Connection from client is closed:", e); + close(); } } @@ -144,12 +125,12 @@ public class SSESessionHandler implements SessionHandlerInterface { } private synchronized void sendPingMessage() { - if (output.isClosed()) { + if (!sink.isClosed()) { + LOG.debug("sending PING:{}", PING_PAYLOAD); + sink.send(sse.newEventBuilder().comment(PING_PAYLOAD).build()); + } else { close(); - return; } - LOG.debug("sending PING:{}", PING_PAYLOAD); - sendComment(PING_PAYLOAD); } private void stopPingProcess() { @@ -160,12 +141,12 @@ public class SSESessionHandler implements SessionHandlerInterface { @Override public synchronized boolean isConnected() { - return !output.isClosed(); + return !sink.isClosed(); } // TODO:return some type of identification of connection @Override public String toString() { - return output.toString(); + return sink.toString(); } } diff --git a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java index f470e1ba4c..5916a607c5 100644 --- a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java +++ b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java @@ -12,6 +12,8 @@ import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -22,7 +24,8 @@ import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.glassfish.jersey.media.sse.EventOutput; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; import org.glassfish.jersey.media.sse.OutboundEvent; import org.junit.Test; import org.junit.runner.RunWith; @@ -40,10 +43,15 @@ public class SSESessionHandlerTest { @Mock private ScheduledFuture pingFuture; @Mock - private EventOutput eventOutput; + private SseEventSink eventSink; + @Mock + private Sse sse; private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) { - final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventOutput, listener, + doCallRealMethod().when(sse).newEvent(any()); + doAnswer(inv -> new OutboundEvent.Builder()).when(sse).newEventBuilder(); + + final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener, maxFragmentSize, heartbeatInterval); doReturn(pingFuture).when(executorService) .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval), @@ -119,13 +127,13 @@ public class SSESessionHandlerTest { @Test public void sendDataMessageWithDisabledFragmentation() throws IOException { final SSESessionHandler sseSessionHandler = setup(0, 0); - doReturn(false).when(eventOutput).isClosed(); + doReturn(false).when(eventSink).isClosed(); sseSessionHandler.init(); final String testMessage = generateRandomStringOfLength(100); sseSessionHandler.sendDataMessage(testMessage); ArgumentCaptor cap = ArgumentCaptor.forClass(OutboundEvent.class); - verify(eventOutput, times(1)).write(cap.capture()); + verify(eventSink, times(1)).send(cap.capture()); OutboundEvent event = cap.getAllValues().get(0); assertNotNull(event); assertEquals(event.getData(), testMessage); @@ -134,19 +142,19 @@ public class SSESessionHandlerTest { @Test public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException { final SSESessionHandler sseSessionHandler = setup(0, 0); - doReturn(true).when(eventOutput).isClosed(); + doReturn(true).when(eventSink).isClosed(); sseSessionHandler.init(); final String testMessage = generateRandomStringOfLength(11); sseSessionHandler.sendDataMessage(testMessage); ArgumentCaptor cap = ArgumentCaptor.forClass(OutboundEvent.class); - verify(eventOutput, times(0)).write(cap.capture()); + verify(eventSink, times(0)).send(cap.capture()); } @Test public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException { final SSESessionHandler sseSessionHandler = setup(100, 0); - doReturn(false).when(eventOutput).isClosed(); + doReturn(false).when(eventSink).isClosed(); sseSessionHandler.init(); // in both cases, fragmentation should not be applied @@ -157,7 +165,7 @@ public class SSESessionHandlerTest { ArgumentCaptor cap = ArgumentCaptor.forClass(OutboundEvent.class); // without fragmentation there will be 2 write calls - verify(eventOutput, times(2)).write(cap.capture()); + verify(eventSink, times(2)).send(cap.capture()); OutboundEvent event1 = cap.getAllValues().get(0); OutboundEvent event2 = cap.getAllValues().get(1); assertNotNull(event1); @@ -176,13 +184,13 @@ public class SSESessionHandlerTest { sseSessionHandler.init(); sseSessionHandler.sendDataMessage(""); - verifyNoMoreInteractions(eventOutput); + verifyNoMoreInteractions(eventSink); } @Test public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException { final SSESessionHandler sseSessionHandler = setup(100, 0); - doReturn(false).when(eventOutput).isClosed(); + doReturn(false).when(eventSink).isClosed(); sseSessionHandler.init(); // there should be 10 fragments of length 100 characters @@ -191,7 +199,7 @@ public class SSESessionHandlerTest { ArgumentCaptor cap = ArgumentCaptor.forClass(OutboundEvent.class); // SSE automatically send fragmented packet ended with new line character due to eventOutput // have only 1 write call - verify(eventOutput, times(1)).write(cap.capture()); + verify(eventSink, times(1)).send(cap.capture()); OutboundEvent event = cap.getAllValues().get(0); assertNotNull(event); String[] lines = ((String) event.getData()).split("\r\n|\r|\n");