Remove RestconfError.ErrorType
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / SubscribeToStreamUtil.java
index ee96e3e36f6152c1773a657d436e1b1b80c6b37f..b3aeb075cdd27199719ce6436819af70c3021291 100644 (file)
@@ -7,32 +7,25 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
-import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
-import org.opendaylight.restconf.common.util.DataChangeScope;
-import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule;
-import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.nb.rfc8040.Rfc8040;
+import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
@@ -40,11 +33,9 @@ import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListen
 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +44,7 @@ import org.slf4j.LoggerFactory;
  */
 abstract class SubscribeToStreamUtil {
     /**
-     * Implementation of {@link UrlResolver} for Server-sent events.
+     * Implementation of SubscribeToStreamUtil for Server-sent events.
      */
     private static final class ServerSentEvents extends SubscribeToStreamUtil {
         static final ServerSentEvents INSTANCE = new ServerSentEvents();
@@ -67,7 +58,7 @@ abstract class SubscribeToStreamUtil {
     }
 
     /**
-     * Implementation of {@link UrlResolver} for Web sockets.
+     * Implementation of SubscribeToStreamUtil for Web sockets.
      */
     private static final class WebSockets extends SubscribeToStreamUtil {
         static final WebSockets INSTANCE = new WebSockets();
@@ -122,48 +113,40 @@ abstract class SubscribeToStreamUtil {
      * @param uriInfo                 URI information.
      * @param notificationQueryParams Query parameters of notification.
      * @param handlersHolder          Holder of handlers for notifications.
-     * @param urlResolver             Resolver for proper implementation. Possibilities is WS or SSE.
      * @return Stream location for listening.
      */
     final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
-        if (Strings.isNullOrEmpty(streamName)) {
+        if (isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         }
-        final Optional<NotificationListenerAdapter> notificationListenerAdapter =
-                ListenersBroker.getInstance().getNotificationListenerFor(streamName);
 
-        if (!notificationListenerAdapter.isPresent()) {
-            throw new RestconfDocumentedException(String.format(
-                    "Stream with name %s was not found.", streamName),
-                    ErrorType.PROTOCOL,
-                    ErrorTag.UNKNOWN_ELEMENT);
-        }
+        final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance()
+            .getNotificationListenerFor(streamName)
+            .orElseThrow(() -> new RestconfDocumentedException(
+                String.format("Stream with name %s was not found.", streamName),
+                ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT));
 
-        final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
-        final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
         final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
-
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
-        registerToListenNotification(
-                notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
-        notificationListenerAdapter.get().setQueryParams(
+        notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
+        notificationListenerAdapter.setQueryParams(
                 notificationQueryParams.getStart(),
                 notificationQueryParams.getStop().orElse(null),
                 notificationQueryParams.getFilter().orElse(null),
                 false, notificationQueryParams.isSkipNotificationData());
-        notificationListenerAdapter.get().setCloseVars(
-                handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+        final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+        notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getSchemaHandler());
         final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
-                    notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier(),
+                    notificationListenerAdapter.getSchemaPath().lastNodeIdentifier(),
                     schemaContext.getNotifications(), notificationQueryParams.getStart(),
-                    notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext));
-        writeDataToDS(schemaContext,
-            notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier().getLocalName(), writeTransaction,
-            mapToStreams);
+                    notificationListenerAdapter.getOutputType(), uri);
+
+        // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+        writeDataToDS(writeTransaction, mapToStreams);
         submitData(writeTransaction);
-        transactionChain.close();
         return uri;
     }
 
@@ -180,65 +163,59 @@ abstract class SubscribeToStreamUtil {
     final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
-        final LogicalDatastoreType datastoreType = parseURIEnum(
-                LogicalDatastoreType.class,
-                mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
-        if (datastoreType == null) {
-            final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
+
+        final String datastoreParam = mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME);
+        if (isNullOrEmpty(datastoreParam)) {
+            final String message = "Stream name does not contain datastore value (pattern /datastore=)";
             LOG.debug(message);
             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
         }
 
-        final DataChangeScope scope = parseURIEnum(
-                DataChangeScope.class,
-                mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
-        if (scope == null) {
-            final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
+        // FIXME: this is kept only for compatibility, we are not using this parameter
+        if (isNullOrEmpty(mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME))) {
+            final String message = "Stream name does not contain scope value (pattern /scope=)";
             LOG.warn(message);
             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
         }
 
         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
-        final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
-        Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
 
-        listener.get().setQueryParams(
+        final ListenerAdapter listener = ListenersBroker.getInstance()
+            .getDataChangeListenerFor(streamName)
+            .orElseThrow(() -> new IllegalArgumentException("Listener does not exist : " + streamName));
+
+        listener.setQueryParams(
                 notificationQueryParams.getStart(),
                 notificationQueryParams.getStop().orElse(null),
                 notificationQueryParams.getFilter().orElse(null),
                 false, notificationQueryParams.isSkipNotificationData());
-        listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
-        registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
+
+        final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+        final SchemaContextHandler schemaHandler = handlersHolder.getSchemaHandler();
+        listener.setCloseVars(dataBroker, schemaHandler);
+        listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
 
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
-        final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
-        final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
-        final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
+        final EffectiveModelContext schemaContext = schemaHandler.get();
+        final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
 
         final MapEntryNode mapToStreams =
-            RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
-                notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
-                getMonitoringModule(schemaContext), schemaContext);
-        writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
-                writeTransaction, mapToStreams);
+            RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
+                notificationQueryParams.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+        writeDataToDS(writeTransaction, mapToStreams);
         submitData(writeTransaction);
-        transactionChain.close();
         return uri;
     }
 
-    static Module getMonitoringModule(final EffectiveModelContext schemaContext) {
-        return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null);
-    }
-
-    private static void writeDataToDS(final EffectiveModelContext schemaContext, final String name,
-            final DOMDataTreeReadWriteTransaction readWriteTransaction, final MapEntryNode mapToStreams) {
-        readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
-            // FIXME: do not use IdentifierCodec here
-            IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name, schemaContext),
+    // FIXME: callers are utter duplicates, refactor them
+    private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
+        // FIXME: use put() here
+        tx.merge(LogicalDatastoreType.OPERATIONAL, Rfc8040.restconfStateStreamPath(mapToStreams.getIdentifier()),
             mapToStreams);
     }
 
-    private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
+    private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
         try {
             readWriteTransaction.commit().get();
         } catch (final InterruptedException | ExecutionException e) {
@@ -262,55 +239,4 @@ abstract class SubscribeToStreamUtil {
         }
         return result;
     }
-
-    /**
-     * Register data change listener in DOM data broker and set it to listener on stream.
-     *
-     * @param datastore     {@link LogicalDatastoreType}
-     * @param listener      listener on specific stream
-     * @param domDataBroker data broker for register data change listener
-     */
-    private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
-            final DOMDataBroker domDataBroker) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
-                .getInstance(DOMDataTreeChangeService.class);
-        if (changeService == null) {
-            throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
-        }
-
-        final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
-        final ListenerRegistration<ListenerAdapter> registration =
-                changeService.registerDataTreeChangeListener(root, listener);
-        listener.setRegistration(registration);
-    }
-
-    private static void registerToListenNotification(final NotificationListenerAdapter listener,
-            final NotificationServiceHandler notificationServiceHandler) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final Absolute path = listener.getSchemaPath();
-        final ListenerRegistration<DOMNotificationListener> registration =
-                notificationServiceHandler.get().registerNotificationListener(listener, path);
-        listener.setRegistration(registration);
-    }
-
-    /**
-     * Parse out enumeration from URI.
-     *
-     * @param clazz Target enumeration type.
-     * @param value String representation of enumeration value.
-     * @return Parsed enumeration type.
-     */
-    private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
-        if (value == null || value.equals("")) {
-            return null;
-        }
-        return ResolveEnumUtil.resolveEnum(clazz, value);
-    }
 }