*/
package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
-import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
-import org.opendaylight.restconf.common.util.DataChangeScope;
-import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule;
-import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
+import org.opendaylight.restconf.nb.rfc8040.Rfc8040;
+import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
abstract class SubscribeToStreamUtil {
/**
- * Implementation of {@link UrlResolver} for Server-sent events.
+ * Implementation of SubscribeToStreamUtil for Server-sent events.
*/
private static final class ServerSentEvents extends SubscribeToStreamUtil {
static final ServerSentEvents INSTANCE = new ServerSentEvents();
}
/**
- * Implementation of {@link UrlResolver} for Web sockets.
+ * Implementation of SubscribeToStreamUtil for Web sockets.
*/
private static final class WebSockets extends SubscribeToStreamUtil {
static final WebSockets INSTANCE = new WebSockets();
* @param uriInfo URI information.
* @param notificationQueryParams Query parameters of notification.
* @param handlersHolder Holder of handlers for notifications.
- * @param urlResolver Resolver for proper implementation. Possibilities is WS or SSE.
* @return Stream location for listening.
*/
final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- if (Strings.isNullOrEmpty(streamName)) {
+ if (isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
}
- final Optional<NotificationListenerAdapter> notificationListenerAdapter =
- ListenersBroker.getInstance().getNotificationListenerFor(streamName);
-
- if (!notificationListenerAdapter.isPresent()) {
- throw new RestconfDocumentedException(String.format(
- "Stream with name %s was not found.", streamName),
- ErrorType.PROTOCOL,
- ErrorTag.UNKNOWN_ELEMENT);
- }
- final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
- final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
- final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
- final boolean exist = checkExist(schemaContext, writeTransaction);
+ final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance()
+ .getNotificationListenerFor(streamName)
+ .orElseThrow(() -> new RestconfDocumentedException(
+ String.format("Stream with name %s was not found.", streamName),
+ ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT));
+ final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
final URI uri = prepareUriByStreamName(uriInfo, streamName);
- registerToListenNotification(
- notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
- notificationListenerAdapter.get().setQueryParams(
- notificationQueryParams.getStart(),
- notificationQueryParams.getStop().orElse(null),
- notificationQueryParams.getFilter().orElse(null),
- false);
- notificationListenerAdapter.get().setCloseVars(
- handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
- final NormalizedNode<?, ?> mapToStreams =
- RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
- notificationListenerAdapter.get().getSchemaPath().getLastComponent(),
- schemaContext.getNotifications(), notificationQueryParams.getStart(),
- notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext), exist);
- writeDataToDS(schemaContext,
- notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction,
- exist, mapToStreams);
+ notificationListenerAdapter.setQueryParams(notificationQueryParams);
+ notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
+ final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+ notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getSchemaHandler());
+ final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+ notificationListenerAdapter.getSchemaPath().lastNodeIdentifier(),
+ schemaContext.getNotifications(), notificationListenerAdapter.getStart(),
+ notificationListenerAdapter.getOutputType(), uri);
+
+ // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+ final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+ writeDataToDS(writeTransaction, mapToStreams);
submitData(writeTransaction);
- transactionChain.close();
return uri;
}
final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
- final LogicalDatastoreType datastoreType = parseURIEnum(
- LogicalDatastoreType.class,
- mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
- if (datastoreType == null) {
- final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
+
+ final String datastoreParam = mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME);
+ if (isNullOrEmpty(datastoreParam)) {
+ final String message = "Stream name does not contain datastore value (pattern /datastore=)";
LOG.debug(message);
throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
}
- final DataChangeScope scope = parseURIEnum(
- DataChangeScope.class,
- mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
- if (scope == null) {
- final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
+ // FIXME: this is kept only for compatibility, we are not using this parameter
+ if (isNullOrEmpty(mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME))) {
+ final String message = "Stream name does not contain scope value (pattern /scope=)";
LOG.warn(message);
throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
}
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
- Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
+ final ListenerAdapter listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName)
+ .orElseThrow(() -> new RestconfDocumentedException("No listener found for stream " + streamName,
+ ErrorType.APPLICATION, ErrorTag.DATA_MISSING));
+ listener.setQueryParams(notificationQueryParams);
- listener.get().setQueryParams(
- notificationQueryParams.getStart(),
- notificationQueryParams.getStop().orElse(null),
- notificationQueryParams.getFilter().orElse(null),
- false);
- listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
- registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
+ final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+ final SchemaContextHandler schemaHandler = handlersHolder.getSchemaHandler();
+ listener.setCloseVars(dataBroker, schemaHandler);
+ listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
final URI uri = prepareUriByStreamName(uriInfo, streamName);
- final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
- final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
- final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
- final boolean exist = checkExist(schemaContext, writeTransaction);
-
- final NormalizedNode<?, ?> mapToStreams = RestconfMappingNodeUtil
- .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
- notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
- getMonitoringModule(schemaContext), exist, schemaContext);
- writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
- writeTransaction, exist, mapToStreams);
+ final EffectiveModelContext schemaContext = schemaHandler.get();
+ final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
+
+ final MapEntryNode mapToStreams =
+ RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
+ listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+ final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+ writeDataToDS(writeTransaction, mapToStreams);
submitData(writeTransaction);
- transactionChain.close();
return uri;
}
- static Module getMonitoringModule(final SchemaContext schemaContext) {
- return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null);
+ // FIXME: callers are utter duplicates, refactor them
+ private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
+ // FIXME: use put() here
+ tx.merge(LogicalDatastoreType.OPERATIONAL, Rfc8040.restconfStateStreamPath(mapToStreams.getIdentifier()),
+ mapToStreams);
}
- private static void writeDataToDS(final SchemaContext schemaContext, final String name,
- final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
- final NormalizedNode<?, ?> mapToStreams) {
- String pathId;
- if (exist) {
- pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
- } else {
- pathId = MonitoringModule.PATH_TO_STREAMS;
- }
- readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
- IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams);
- }
-
- private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
+ private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
try {
readWriteTransaction.commit().get();
} catch (final InterruptedException | ExecutionException e) {
}
return result;
}
-
- /**
- * Register data change listener in DOM data broker and set it to listener on stream.
- *
- * @param datastore {@link LogicalDatastoreType}
- * @param listener listener on specific stream
- * @param domDataBroker data broker for register data change listener
- */
- private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
- final DOMDataBroker domDataBroker) {
- if (listener.isListening()) {
- return;
- }
-
- final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
- .getInstance(DOMDataTreeChangeService.class);
- if (changeService == null) {
- throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
- }
-
- final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
- final ListenerRegistration<ListenerAdapter> registration =
- changeService.registerDataTreeChangeListener(root, listener);
- listener.setRegistration(registration);
- }
-
- private static boolean checkExist(final SchemaContext schemaContext,
- final DOMDataTreeReadOperations readWriteTransaction) {
- try {
- return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
- IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get();
- } catch (final InterruptedException | ExecutionException exception) {
- throw new RestconfDocumentedException("Problem while checking data if exists", exception);
- }
- }
-
- private static void registerToListenNotification(final NotificationListenerAdapter listener,
- final NotificationServiceHandler notificationServiceHandler) {
- if (listener.isListening()) {
- return;
- }
-
- final SchemaPath path = listener.getSchemaPath();
- final ListenerRegistration<DOMNotificationListener> registration =
- notificationServiceHandler.get().registerNotificationListener(listener, path);
- listener.setRegistration(registration);
- }
-
- /**
- * Parse out enumeration from URI.
- *
- * @param clazz Target enumeration type.
- * @param value String representation of enumeration value.
- * @return Parsed enumeration type.
- */
- private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
- if (value == null || value.equals("")) {
- return null;
- }
- return ResolveEnumUtil.resolveEnum(clazz, value);
- }
}