Reconstruct inference stack during normalization
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / SubscribeToStreamUtil.java
index 04e91f2af81393ad18a3869b75d9f8b73b2e2688..2019aff332720709ecd41264b09e136b4e768b74 100644 (file)
@@ -18,19 +18,13 @@ import javax.ws.rs.core.UriInfo;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
-import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
-import org.opendaylight.mdsal.dom.api.DOMNotificationService;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
+import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
 import org.opendaylight.restconf.nb.rfc8040.Rfc8040;
+import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
@@ -38,10 +32,10 @@ import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListen
 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,26 +128,21 @@ abstract class SubscribeToStreamUtil {
                 String.format("Stream with name %s was not found.", streamName),
                 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT));
 
-        final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
-        final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
         final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
-
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
-        registerToListenNotification(notificationListenerAdapter, handlersHolder.getNotificationServiceHandler());
-        notificationListenerAdapter.setQueryParams(
-                notificationQueryParams.getStart(),
-                notificationQueryParams.getStop().orElse(null),
-                notificationQueryParams.getFilter().orElse(null),
-                false, notificationQueryParams.isSkipNotificationData());
-        notificationListenerAdapter.setCloseVars(
-                handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+        notificationListenerAdapter.setQueryParams(notificationQueryParams);
+        notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
+        final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+        notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getSchemaHandler());
         final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
                     notificationListenerAdapter.getSchemaPath().lastNodeIdentifier(),
-                    schemaContext.getNotifications(), notificationQueryParams.getStart(),
+                    schemaContext.getNotifications(), notificationListenerAdapter.getStart(),
                     notificationListenerAdapter.getOutputType(), uri);
+
+        // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeDataToDS(writeTransaction, mapToStreams);
         submitData(writeTransaction);
-        transactionChain.close();
         return uri;
     }
 
@@ -186,33 +175,26 @@ abstract class SubscribeToStreamUtil {
         }
 
         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
+        final ListenerAdapter listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName)
+            .orElseThrow(() -> new RestconfDocumentedException("No listener found for stream " + streamName,
+                ErrorType.APPLICATION, ErrorTag.DATA_MISSING));
+        listener.setQueryParams(notificationQueryParams);
 
-        final ListenerAdapter listener = ListenersBroker.getInstance()
-            .getDataChangeListenerFor(streamName)
-            .orElseThrow(() -> new IllegalArgumentException("Listener does not exist : " + streamName));
-
-        listener.setQueryParams(
-                notificationQueryParams.getStart(),
-                notificationQueryParams.getStop().orElse(null),
-                notificationQueryParams.getFilter().orElse(null),
-                false, notificationQueryParams.isSkipNotificationData());
-        listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
-
-        final LogicalDatastoreType datastoreType = LogicalDatastoreType.valueOf(datastoreParam);
-        registration(datastoreType, listener, handlersHolder.getDataBroker());
+        final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+        final SchemaContextHandler schemaHandler = handlersHolder.getSchemaHandler();
+        listener.setCloseVars(dataBroker, schemaHandler);
+        listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
 
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
-        final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
-        final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
-        final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
+        final EffectiveModelContext schemaContext = schemaHandler.get();
         final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
 
         final MapEntryNode mapToStreams =
             RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
-                notificationQueryParams.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+                listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeDataToDS(writeTransaction, mapToStreams);
         submitData(writeTransaction);
-        transactionChain.close();
         return uri;
     }
 
@@ -223,7 +205,7 @@ abstract class SubscribeToStreamUtil {
             mapToStreams);
     }
 
-    private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
+    private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
         try {
             readWriteTransaction.commit().get();
         } catch (final InterruptedException | ExecutionException e) {
@@ -247,42 +229,4 @@ abstract class SubscribeToStreamUtil {
         }
         return result;
     }
-
-    /**
-     * Register data change listener in DOM data broker and set it to listener on stream.
-     *
-     * @param datastore     {@link LogicalDatastoreType}
-     * @param listener      listener on specific stream
-     * @param domDataBroker data broker for register data change listener
-     */
-    private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
-            final DOMDataBroker domDataBroker) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
-                .getInstance(DOMDataTreeChangeService.class);
-        if (changeService == null) {
-            throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
-        }
-
-        final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
-        final ListenerRegistration<ListenerAdapter> registration =
-                changeService.registerDataTreeChangeListener(root, listener);
-        listener.setRegistration(registration);
-    }
-
-    // FIXME: this method should be in NotificationListenerAdapter
-    private static void registerToListenNotification(final NotificationListenerAdapter listener,
-            final DOMNotificationService notificationService) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final Absolute path = listener.getSchemaPath();
-        final ListenerRegistration<DOMNotificationListener> registration =
-                notificationService.registerNotificationListener(listener, path);
-        listener.setRegistration(registration);
-    }
 }