Remove BaseListenerInterface 33/108833/3
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 06:46:48 +0000 (07:46 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 3 Nov 2023 10:43:09 +0000 (10:43 +0000)
BaseListenerInterface is directly implemented by
AbstractCommonSubscriber. Inline it and rename AbstractCommonSubscriber
to AbstractStream -- thus improving overall naming.

JIRA: NETCONF-1102
Change-Id: I6ef187bd736f22bd632936fb64ffa178eff375c7
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationListenerAdaptor.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractStream.java [moved from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractCommonSubscriber.java with 86% similarity]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/BaseListenerInterface.java [deleted file]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenerAdapter.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandler.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandler.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandlerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandlerTest.java

index 8cb0ce2bafc600e7817f9f3d084a3ce54d54d60e..29d9305f1c2f9d44c0c45dd12d7b609d822fb8d6 100644 (file)
@@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
  * Abstract base class for functionality shared between {@link NotificationListenerAdapter} and
  * {@link DeviceNotificationListenerAdaptor}.
  */
-abstract class AbstractNotificationListenerAdaptor extends AbstractCommonSubscriber<DOMNotification>
+abstract class AbstractNotificationListenerAdaptor extends AbstractStream<DOMNotification>
         implements DOMNotificationListener {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class);
     private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY =
similarity index 86%
rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractCommonSubscriber.java
rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractStream.java
index 291a9baea6e8338d388e0f84b754f5ed0cb869fb..5f3ec6786c6d32836d37670dd6ef97726cebaf2f 100644 (file)
@@ -35,10 +35,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Features of subscribing part of both notifications.
+ * Base superclass for all stream types.
  */
-abstract class AbstractCommonSubscriber<T> implements BaseListenerInterface {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
+abstract class AbstractStream<T> implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractStream.class);
 
     private final EventFormatterFactory<T> formatterFactory;
     private final NotificationOutputType outputType;
@@ -57,7 +57,7 @@ abstract class AbstractCommonSubscriber<T> implements BaseListenerInterface {
     protected DatabindProvider databindProvider;
     private DOMDataBroker dataBroker;
 
-    AbstractCommonSubscriber(final String streamName, final NotificationOutputType outputType,
+    AbstractStream(final String streamName, final NotificationOutputType outputType,
             final EventFormatterFactory<T> formatterFactory, final ListenersBroker listenersBroker) {
         this.streamName = requireNonNull(streamName);
         checkArgument(!streamName.isEmpty());
@@ -68,23 +68,39 @@ abstract class AbstractCommonSubscriber<T> implements BaseListenerInterface {
         formatter = formatterFactory.emptyFormatter();
     }
 
-    @Override
+    /**
+     * Get name of stream.
+     *
+     * @return Stream name.
+     */
     public final String getStreamName() {
         return streamName;
     }
 
-    @Override
-    public final String getOutputType() {
+    /**
+     * Get output type.
+     *
+     * @return Output type (JSON or XML).
+     */
+    final String getOutputType() {
         return outputType.getName();
     }
 
-    @Override
-    public final synchronized boolean hasSubscribers() {
+    /**
+     * Checks if exists at least one {@link StreamSessionHandler} subscriber.
+     *
+     * @return {@code true} if exist at least one {@link StreamSessionHandler} subscriber, {@code false} otherwise.
+     */
+    final synchronized boolean hasSubscribers() {
         return !subscribers.isEmpty();
     }
 
-    @Override
-    public final synchronized Set<StreamSessionHandler> getSubscribers() {
+    /**
+     * Return all subscribers of listener.
+     *
+     * @return Set of all subscribers.
+     */
+    final synchronized Set<StreamSessionHandler> getSubscribers() {
         return new HashSet<>(subscribers);
     }
 
@@ -98,16 +114,24 @@ abstract class AbstractCommonSubscriber<T> implements BaseListenerInterface {
         subscribers.clear();
     }
 
-    @Override
-    public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
+    /**
+     * Registers {@link StreamSessionHandler} subscriber.
+     *
+     * @param subscriber SSE or WS session handler.
+     */
+    synchronized void addSubscriber(final StreamSessionHandler subscriber) {
         final boolean isConnected = subscriber.isConnected();
         checkState(isConnected);
         LOG.debug("Subscriber {} is added.", subscriber);
         subscribers.add(subscriber);
     }
 
-    @Override
-    public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
+    /**
+     * Removes {@link StreamSessionHandler} subscriber.
+     *
+     * @param subscriber SSE or WS session handler.
+     */
+    synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
         subscribers.remove(subscriber);
         LOG.debug("Subscriber {} is removed", subscriber);
         if (!hasSubscribers()) {
diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/BaseListenerInterface.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/BaseListenerInterface.java
deleted file mode 100644 (file)
index 64d637a..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-import java.util.Set;
-
-/**
- * Base interface for both listeners({@link ListenerAdapter}, {@link NotificationListenerAdapter}).
- */
-public interface BaseListenerInterface extends AutoCloseable {
-    /**
-     * Return all subscribers of listener.
-     *
-     * @return Set of all subscribers.
-     */
-    Set<StreamSessionHandler> getSubscribers();
-
-    /**
-     * Checks if exists at least one {@link StreamSessionHandler} subscriber.
-     *
-     * @return {@code true} if exist at least one {@link StreamSessionHandler} subscriber, {@code false} otherwise.
-     */
-    boolean hasSubscribers();
-
-    /**
-     * Get name of stream.
-     *
-     * @return Stream name.
-     */
-    String getStreamName();
-
-    /**
-     * Get output type.
-     *
-     * @return Output type (JSON or XML).
-     */
-    String getOutputType();
-
-    /**
-     * Registers {@link StreamSessionHandler} subscriber.
-     *
-     * @param subscriber SSE or WS session handler.
-     */
-    void addSubscriber(StreamSessionHandler subscriber);
-
-    /**
-     * Removes {@link StreamSessionHandler} subscriber.
-     *
-     * @param subscriber SSE or WS session handler.
-     */
-    void removeSubscriber(StreamSessionHandler subscriber);
-}
index d995fe5eea0bbf44b808984e240b97a5383c922b..dd971234ba56e51e7e65c3cc920f620ac4ce6531 100644 (file)
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
  */
-public class ListenerAdapter extends AbstractCommonSubscriber<Collection<DataTreeCandidate>>
+public class ListenerAdapter extends AbstractStream<Collection<DataTreeCandidate>>
         implements ClusteredDOMDataTreeChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
     private static final DataTreeCandidateFormatterFactory JSON_FORMATTER_FACTORY =
index 6822e3d3a499867c8c01efa29ec9583f94d21b9d..d86a94a43e95fcee56a53f8a6a2380af3c5da9c5 100644 (file)
@@ -160,7 +160,7 @@ public abstract sealed class ListenersBroker {
      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
      */
-    public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
+    public final @Nullable AbstractStream<?> listenerFor(final String streamName) {
         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
             return notificationListenerFor(streamName);
         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
@@ -425,14 +425,14 @@ public abstract sealed class ListenersBroker {
     /**
      * Removal and closing of general listener (data-change or notification listener).
      *
-     * @param listener Listener to be closed and removed from cache.
+     * @param stream Stream to be closed and removed from cache.
      */
-    final void removeAndCloseListener(final BaseListenerInterface listener) {
-        requireNonNull(listener);
-        if (listener instanceof ListenerAdapter) {
-            removeAndCloseDataChangeListener((ListenerAdapter) listener);
-        } else if (listener instanceof NotificationListenerAdapter) {
-            removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
+    final void removeAndCloseListener(final AbstractStream<?> stream) {
+        requireNonNull(stream);
+        if (stream instanceof ListenerAdapter dataChange) {
+            removeAndCloseDataChangeListener(dataChange);
+        } else if (stream instanceof NotificationListenerAdapter notification) {
+            removeAndCloseNotificationListener(notification);
         }
     }
 
index ba82788c67a98345102b672abef0869298480a13..cab53b166a9e46d703c3d4dca917f9c4988e2e92 100644 (file)
@@ -23,12 +23,11 @@ import org.slf4j.LoggerFactory;
  */
 public final class SSESessionHandler implements StreamSessionHandler {
     private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
-    private static final String PING_PAYLOAD = "ping";
-
     private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
 
     private final ScheduledExecutorService executorService;
-    private final BaseListenerInterface listener;
+    // FIXME: this really should include subscription details like formatter etc.
+    private final AbstractStream<?> listener;
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
     private final SseEventSink sink;
@@ -53,7 +52,7 @@ public final class SSESessionHandler implements StreamSessionHandler {
      *            session up. Ping control frames are disabled if this parameter is set to 0.
      */
     public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
-            final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) {
+            final AbstractStream<?> listener, final int maximumFragmentLength, final int heartbeatInterval) {
         this.executorService = executorService;
         this.sse = sse;
         this.sink = sink;
@@ -124,8 +123,8 @@ public final class SSESessionHandler implements StreamSessionHandler {
 
     private synchronized void sendPingMessage() {
         if (!sink.isClosed()) {
-            LOG.debug("sending PING:{}", PING_PAYLOAD);
-            sink.send(sse.newEventBuilder().comment(PING_PAYLOAD).build());
+            LOG.debug("sending PING");
+            sink.send(sse.newEventBuilder().comment("ping").build());
         } else {
             close();
         }
index cd25a117478b9cef9e1632a3d57abe47144810a3..700624b823c2605501336779251daf6320c975e0 100644 (file)
@@ -38,7 +38,8 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
     private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
 
     private final ScheduledExecutorService executorService;
-    private final BaseListenerInterface listener;
+    // FIXME: this really should include formatter etc.
+    private final AbstractStream<?> listener;
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
 
@@ -61,7 +62,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
      * @param heartbeatInterval     Interval in milliseconds of sending of ping control frames to remote endpoint
      *                              to keep session up. Ping control frames are disabled if this parameter is set to 0.
      */
-    WebSocketSessionHandler(final ScheduledExecutorService executorService, final BaseListenerInterface listener,
+    WebSocketSessionHandler(final ScheduledExecutorService executorService, final AbstractStream<?> listener,
             final int maximumFragmentLength, final int heartbeatInterval) {
         this.executorService = executorService;
         this.listener = listener;
index 316f1d781d08359b06c14711f9460b12b735192d..a802f7a1cca369d6abaf85b151a2ffcbecb70544 100644 (file)
@@ -37,7 +37,7 @@ public class SSESessionHandlerTest {
     @Mock
     private ScheduledExecutorService executorService;
     @Mock
-    private BaseListenerInterface listener;
+    private AbstractStream<?> listener;
     @Mock
     private ScheduledFuture<?> pingFuture;
     @Mock
index b8c1da9fcbcb68c4817ab040bdbe1e5178fa4460..631ef9c46c9bae9928f953984408af5607ae3bc5 100644 (file)
@@ -31,17 +31,16 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 public class WebSocketSessionHandlerTest {
-
     private static final class WebSocketTestSessionState {
-        private final BaseListenerInterface listener;
+        private final AbstractStream<?> listener;
         private final ScheduledExecutorService executorService;
         private final WebSocketSessionHandler webSocketSessionHandler;
         private final int heartbeatInterval;
         private final int maxFragmentSize;
         private final ScheduledFuture pingFuture;
 
-        private WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) {
-            listener = mock(BaseListenerInterface.class);
+        WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) {
+            listener = mock(AbstractStream.class);
             executorService = mock(ScheduledExecutorService.class);
             this.heartbeatInterval = heartbeatInterval;
             this.maxFragmentSize = maxFragmentSize;