Switch to using SseEventSink 81/96281/1
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 17 May 2021 14:00:42 +0000 (16:00 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 20 May 2021 08:33:55 +0000 (10:33 +0200)
JAX-RS 2.1 has a standard way of working with SSE, do not use
Jersey-specific APIs (except in test).

JIRA: NETCONF-775
Change-Id: I46c2b80be52d2313ab9193361da788f41923c175
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit c2baf51aa870d005561658057b0674d66177d813)

restconf/restconf-nb-rfc8040/pom.xml
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfDataStreamService.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/services/wrapper/ServicesNotifWrapper.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandler.java
restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/sse/SSESessionHandlerTest.java

index a3ed4e42bbd96d6285c0234bb42dc63bd36f2fba..4f8ca13d88e0f90dc1e239827c8b6c9a58507d28 100644 (file)
       <groupId>org.eclipse.jetty.websocket</groupId>
       <artifactId>websocket-server</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.glassfish.jersey.media</groupId>
-      <artifactId>jersey-media-sse</artifactId>
-    </dependency>
 
     <!-- Testing Dependencies -->
     <dependency>
       <artifactId>jersey-hk2</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.media</groupId>
+      <artifactId>jersey-media-sse</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.mdsal</groupId>
       <artifactId>mdsal-binding-test-utils</artifactId>
index 4a6d6fe30cb74b4ef6d75b7a6d516fb10543f6da..b9f58fb9c7e5a5e93bae7f98fab39928319acb10 100644 (file)
@@ -15,21 +15,19 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.UriInfo;
-import org.glassfish.jersey.media.sse.EventOutput;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
 
 public interface RestconfDataStreamService {
     /**
      * Get target data resource.
      *
-     * @param identifier
-     *            path to target
-     * @param uriInfo
-     *            URI info
-     * @return {@link EventOutput}
+     * @param identifier path to target
+     * @param uriInfo URI info
      */
     @GET
     @Path("/{identifier:.+}")
     @Produces(MediaType.SERVER_SENT_EVENTS)
-    EventOutput getSSE(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo);
-
+    void getSSE(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo,
+        @Context SseEventSink sink, @Context Sse sse);
 }
index 6b06b5278a0b6c6e0eed7259a5de8dfb0fcfef32..b716da3ec256afd03db21b9df3b33b02674850e0 100644 (file)
@@ -12,7 +12,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import javax.ws.rs.core.UriInfo;
-import org.glassfish.jersey.media.sse.EventOutput;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
@@ -44,7 +45,7 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService
     }
 
     @Override
-    public EventOutput getSSE(final String identifier, final UriInfo uriInfo) {
+    public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) {
         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
         final Optional<BaseListenerInterface> listener = listenersBroker.getListenerFor(streamName);
 
@@ -54,10 +55,11 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService
         }
 
         LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
-        final EventOutput eventOutput = new EventOutput();
-        final SSESessionHandler handler = new SSESessionHandler(executorService, eventOutput, listener.get(),
+
+        // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
+        //        handler.init()/handler.close()
+        final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener.get(),
             maximumFragmentLength, heartbeatInterval);
         handler.init();
-        return eventOutput;
     }
 }
index 7d6b76715d9e123cf8f4fdb1fb444f306731b0fa..2f34564472ba08704a6196213bb612c0644accac 100644 (file)
@@ -9,7 +9,8 @@ package org.opendaylight.restconf.nb.rfc8040.services.wrapper;
 
 import javax.ws.rs.Path;
 import javax.ws.rs.core.UriInfo;
-import org.glassfish.jersey.media.sse.EventOutput;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataStreamService;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
 import org.opendaylight.restconf.nb.rfc8040.streams.sse.SSEInitializer;
@@ -25,17 +26,17 @@ public final class ServicesNotifWrapper implements RestconfDataStreamService {
 
     private final RestconfDataStreamService delegRestStream;
 
-    private ServicesNotifWrapper(RestconfDataStreamService delegRestStream) {
+    private ServicesNotifWrapper(final RestconfDataStreamService delegRestStream) {
         this.delegRestStream = delegRestStream;
     }
 
-    public static ServicesNotifWrapper newInstance(SSEInitializer configuration) {
+    public static ServicesNotifWrapper newInstance(final SSEInitializer configuration) {
         RestconfDataStreamService delegRestStream = new RestconfDataStreamServiceImpl(configuration);
         return new ServicesNotifWrapper(delegRestStream);
     }
 
     @Override
-    public EventOutput getSSE(String identifier, UriInfo uriInfo) {
-        return this.delegRestStream.getSSE(identifier, uriInfo);
+    public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) {
+        this.delegRestStream.getSSE(identifier, uriInfo, sink, sse);
     }
 }
index d1dadcf22e7eea547db73b0baf0b0deb65f09ef3..b27502ac28be36b916e27acdbd6cc2be6b7e1f48 100644 (file)
@@ -49,6 +49,7 @@ public final class ListenersBroker {
      *
      * @return Reusable instance of {@link ListenersBroker}.
      */
+    // FIXME: remove this global singleton
     public static synchronized ListenersBroker getInstance() {
         if (listenersBroker == null) {
             listenersBroker = new ListenersBroker();
@@ -303,7 +304,7 @@ public final class ListenersBroker {
     }
 
     @SuppressWarnings({"checkstyle:IllegalCatch"})
-    private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) {
+    private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
         try {
             requireNonNull(listener).close();
             if (notificationListeners.inverse().remove(listener) == null) {
index da1ee72e4c4cb478b438a2fa6763ca7dc428df71..26519048b88911ef5d36d783efcb270e7741a596 100644 (file)
@@ -9,12 +9,11 @@ package org.opendaylight.restconf.nb.rfc8040.streams.sse;
 
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Strings;
-import java.io.IOException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.glassfish.jersey.media.sse.EventOutput;
-import org.glassfish.jersey.media.sse.OutboundEvent;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
 import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
 import org.slf4j.Logger;
@@ -34,7 +33,9 @@ public class SSESessionHandler implements SessionHandlerInterface {
     private final BaseListenerInterface listener;
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
-    private final EventOutput output;
+    private final SseEventSink sink;
+    private final Sse sse;
+
     private ScheduledFuture<?> pingProcess;
 
     /**
@@ -53,10 +54,11 @@ public class SSESessionHandler implements SessionHandlerInterface {
      * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
      *            session up. Ping control frames are disabled if this parameter is set to 0.
      */
-    public SSESessionHandler(final ScheduledExecutorService executorService, final EventOutput output,
+    public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
             final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) {
         this.executorService = executorService;
-        this.output = output;
+        this.sse = sse;
+        this.sink = sink;
         this.listener = listener;
         this.maximumFragmentLength = maximumFragmentLength;
         this.heartbeatInterval = heartbeatInterval;
@@ -83,11 +85,10 @@ public class SSESessionHandler implements SessionHandlerInterface {
     }
 
     /**
-     * Sending of string message to outbound Server-Sent Events channel
-     * {@link org.glassfish.jersey.media.sse.EventOutput}. SSE is automatically split to fragments with new line
-     * character. If the maximum fragment length is set to non-zero positive value and input message exceeds this
-     * value, message is manually fragmented to multiple message fragments which are send individually. Previous
-     * fragmentation is removed.
+     * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
+     * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
+     * message exceeds this value, message is manually fragmented to multiple message fragments which are send
+     * individually. Previous fragmentation is removed.
      *
      * @param message Message data to be send over web-socket session.
      */
@@ -97,32 +98,12 @@ public class SSESessionHandler implements SessionHandlerInterface {
             // FIXME: should this be tolerated?
             return;
         }
-        if (output.isClosed()) {
-            close();
-            return;
-        }
-        if (maximumFragmentLength != 0 && message.length() > maximumFragmentLength) {
-            sendMessage(splitMessageToFragments(message));
+        if (!sink.isClosed()) {
+            final String toSend = maximumFragmentLength != 0 && message.length() > maximumFragmentLength
+                ? splitMessageToFragments(message) : message;
+            sink.send(sse.newEvent(toSend));
         } else {
-            sendMessage(message);
-        }
-    }
-
-    private void sendMessage(final String message) {
-        try {
-            output.write(new OutboundEvent.Builder().data(String.class, message).build());
-        } catch (IOException e) {
-            LOG.warn("Connection from client {} is closed", this);
-            LOG.debug("Connection from client is closed:", e);
-        }
-    }
-
-    private void sendComment(final String message) {
-        try {
-            output.write(new OutboundEvent.Builder().comment(message).build());
-        } catch (IOException e) {
-            LOG.warn("Connection from client {} is closed", this);
-            LOG.debug("Connection from client is closed:", e);
+            close();
         }
     }
 
@@ -144,12 +125,12 @@ public class SSESessionHandler implements SessionHandlerInterface {
     }
 
     private synchronized void sendPingMessage() {
-        if (output.isClosed()) {
+        if (!sink.isClosed()) {
+            LOG.debug("sending PING:{}", PING_PAYLOAD);
+            sink.send(sse.newEventBuilder().comment(PING_PAYLOAD).build());
+        } else {
             close();
-            return;
         }
-        LOG.debug("sending PING:{}", PING_PAYLOAD);
-        sendComment(PING_PAYLOAD);
     }
 
     private void stopPingProcess() {
@@ -160,12 +141,12 @@ public class SSESessionHandler implements SessionHandlerInterface {
 
     @Override
     public synchronized boolean isConnected() {
-        return !output.isClosed();
+        return !sink.isClosed();
     }
 
     // TODO:return some type of identification of connection
     @Override
     public String toString() {
-        return output.toString();
+        return sink.toString();
     }
 }
index f470e1ba4cb67e1e040eb8aac5833fb74e4721f8..5916a607c598e7c2fdd1b142fe7542db1b446faa 100644 (file)
@@ -12,6 +12,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -22,7 +24,8 @@ import java.io.IOException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.glassfish.jersey.media.sse.EventOutput;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
 import org.glassfish.jersey.media.sse.OutboundEvent;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,10 +43,15 @@ public class SSESessionHandlerTest {
     @Mock
     private ScheduledFuture<?> pingFuture;
     @Mock
-    private EventOutput eventOutput;
+    private SseEventSink eventSink;
+    @Mock
+    private Sse sse;
 
     private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
-        final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventOutput, listener,
+        doCallRealMethod().when(sse).newEvent(any());
+        doAnswer(inv -> new OutboundEvent.Builder()).when(sse).newEventBuilder();
+
+        final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener,
             maxFragmentSize, heartbeatInterval);
         doReturn(pingFuture).when(executorService)
             .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
@@ -119,13 +127,13 @@ public class SSESessionHandlerTest {
     @Test
     public void sendDataMessageWithDisabledFragmentation() throws IOException {
         final SSESessionHandler sseSessionHandler = setup(0, 0);
-        doReturn(false).when(eventOutput).isClosed();
+        doReturn(false).when(eventSink).isClosed();
         sseSessionHandler.init();
         final String testMessage = generateRandomStringOfLength(100);
         sseSessionHandler.sendDataMessage(testMessage);
 
         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
-        verify(eventOutput, times(1)).write(cap.capture());
+        verify(eventSink, times(1)).send(cap.capture());
         OutboundEvent event = cap.getAllValues().get(0);
         assertNotNull(event);
         assertEquals(event.getData(), testMessage);
@@ -134,19 +142,19 @@ public class SSESessionHandlerTest {
     @Test
     public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
         final SSESessionHandler sseSessionHandler = setup(0, 0);
-        doReturn(true).when(eventOutput).isClosed();
+        doReturn(true).when(eventSink).isClosed();
         sseSessionHandler.init();
 
         final String testMessage = generateRandomStringOfLength(11);
         sseSessionHandler.sendDataMessage(testMessage);
         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
-        verify(eventOutput, times(0)).write(cap.capture());
+        verify(eventSink, times(0)).send(cap.capture());
     }
 
     @Test
     public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
         final SSESessionHandler sseSessionHandler = setup(100, 0);
-        doReturn(false).when(eventOutput).isClosed();
+        doReturn(false).when(eventSink).isClosed();
         sseSessionHandler.init();
 
         // in both cases, fragmentation should not be applied
@@ -157,7 +165,7 @@ public class SSESessionHandlerTest {
 
         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
         // without fragmentation there will be 2 write calls
-        verify(eventOutput, times(2)).write(cap.capture());
+        verify(eventSink, times(2)).send(cap.capture());
         OutboundEvent event1 = cap.getAllValues().get(0);
         OutboundEvent event2 = cap.getAllValues().get(1);
         assertNotNull(event1);
@@ -176,13 +184,13 @@ public class SSESessionHandlerTest {
         sseSessionHandler.init();
 
         sseSessionHandler.sendDataMessage("");
-        verifyNoMoreInteractions(eventOutput);
+        verifyNoMoreInteractions(eventSink);
     }
 
     @Test
     public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
         final SSESessionHandler sseSessionHandler = setup(100, 0);
-        doReturn(false).when(eventOutput).isClosed();
+        doReturn(false).when(eventSink).isClosed();
         sseSessionHandler.init();
 
         // there should be 10 fragments of length 100 characters
@@ -191,7 +199,7 @@ public class SSESessionHandlerTest {
         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
         // SSE automatically send fragmented packet ended with new line character due to eventOutput
         // have only 1 write call
-        verify(eventOutput, times(1)).write(cap.capture());
+        verify(eventSink, times(1)).send(cap.capture());
         OutboundEvent event = cap.getAllValues().get(0);
         assertNotNull(event);
         String[] lines = ((String) event.getData()).split("\r\n|\r|\n");