Remove BaseListenerInterface
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / ListenersBroker.java
index fc2221c3b2d9fd774f0664fa37140603f793feea..d86a94a43e95fcee56a53f8a6a2380af3c5da9c5 100644 (file)
@@ -26,11 +26,9 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
+import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
-import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
@@ -60,7 +58,7 @@ public abstract sealed class ListenersBroker {
         @Override
         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
             return uriInfo.getBaseUriBuilder()
-                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
+                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
                 .build();
         }
     }
@@ -80,7 +78,7 @@ public abstract sealed class ListenersBroker {
 
             return uriInfo.getBaseUriBuilder()
                 .scheme(scheme)
-                .replacePath(URLConstants.BASE_PATH + '/' + streamName)
+                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
                 .build();
         }
     }
@@ -162,7 +160,7 @@ public abstract sealed class ListenersBroker {
      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
      */
-    public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
+    public final @Nullable AbstractStream<?> listenerFor(final String streamName) {
         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
             return notificationListenerFor(streamName);
         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
@@ -196,7 +194,7 @@ public abstract sealed class ListenersBroker {
         final long stamp = dataChangeListenersLock.writeLock();
         try {
             return dataChangeListeners.computeIfAbsent(sb.toString(),
-                streamName -> new ListenerAdapter(datastore, path, streamName, outputType, this));
+                streamName -> new ListenerAdapter(streamName, outputType, this, datastore, path));
         } finally {
             dataChangeListenersLock.unlockWrite(stamp);
         }
@@ -241,7 +239,7 @@ public abstract sealed class ListenersBroker {
         final long stamp = notificationListenersLock.writeLock();
         try {
             return notificationListeners.computeIfAbsent(sb.toString(),
-                streamName -> new NotificationListenerAdapter(notifications, streamName, outputType, this));
+                streamName -> new NotificationListenerAdapter(streamName, outputType, this, notifications));
         } finally {
             notificationListenersLock.unlockWrite(stamp);
         }
@@ -266,8 +264,8 @@ public abstract sealed class ListenersBroker {
         final long stamp = deviceNotificationListenersLock.writeLock();
         try {
             return deviceNotificationListeners.computeIfAbsent(sb.toString(),
-                streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
-                    mountPointService, path, this));
+                streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, refSchemaCtx,
+                    mountPointService, path));
         } finally {
             deviceNotificationListenersLock.unlockWrite(stamp);
         }
@@ -427,14 +425,14 @@ public abstract sealed class ListenersBroker {
     /**
      * Removal and closing of general listener (data-change or notification listener).
      *
-     * @param listener Listener to be closed and removed from cache.
+     * @param stream Stream to be closed and removed from cache.
      */
-    final void removeAndCloseListener(final BaseListenerInterface listener) {
-        requireNonNull(listener);
-        if (listener instanceof ListenerAdapter) {
-            removeAndCloseDataChangeListener((ListenerAdapter) listener);
-        } else if (listener instanceof NotificationListenerAdapter) {
-            removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
+    final void removeAndCloseListener(final AbstractStream<?> stream) {
+        requireNonNull(stream);
+        if (stream instanceof ListenerAdapter dataChange) {
+            removeAndCloseDataChangeListener(dataChange);
+        } else if (stream instanceof NotificationListenerAdapter notification) {
+            removeAndCloseNotificationListener(notification);
         }
     }
 
@@ -445,7 +443,7 @@ public abstract sealed class ListenersBroker {
      * @param uri URI for creation of stream name.
      * @return String representation of stream name.
      */
-    public static String createStreamNameFromUri(final String uri) {
+    private static String createStreamNameFromUri(final String uri) {
         String result = requireNonNull(uri);
         while (true) {
             if (result.startsWith(URLConstants.BASE_PATH)) {
@@ -482,7 +480,7 @@ public abstract sealed class ListenersBroker {
      * @return Stream location for listening.
      */
     public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
-            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
+            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final String streamName = createStreamNameFromUri(identifier);
         if (isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
@@ -500,8 +498,7 @@ public abstract sealed class ListenersBroker {
         final DOMDataBroker dataBroker = handlersHolder.dataBroker();
         notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
         final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
-            notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
-            notificationListenerAdapter.getOutputType(), uri);
+            notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
 
         // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
         final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
@@ -521,7 +518,7 @@ public abstract sealed class ListenersBroker {
      * @return Location for listening.
      */
     public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
-            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
+            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final var streamName = createStreamNameFromUri(identifier);
         final var listener = dataChangeListenerFor(streamName);
         if (listener == null) {
@@ -541,7 +538,7 @@ public abstract sealed class ListenersBroker {
         final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
 
         final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
-                listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+                listener.getOutputType(), uri, schemaContext, serializedPath);
         final var writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeDataToDS(writeTransaction, mapToStreams);
         submitData(writeTransaction);