*/
package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
-import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
-import org.opendaylight.mdsal.dom.api.DOMNotificationService;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
-import org.opendaylight.restconf.common.util.DataChangeScope;
-import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule;
+import org.opendaylight.restconf.nb.rfc8040.Rfc8040;
+import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- if (Strings.isNullOrEmpty(streamName)) {
+ if (isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
}
- final Optional<NotificationListenerAdapter> notificationListenerAdapter =
- ListenersBroker.getInstance().getNotificationListenerFor(streamName);
- if (notificationListenerAdapter.isEmpty()) {
- throw new RestconfDocumentedException(String.format(
- "Stream with name %s was not found.", streamName),
- ErrorType.PROTOCOL,
- ErrorTag.UNKNOWN_ELEMENT);
- }
+ final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance()
+ .getNotificationListenerFor(streamName)
+ .orElseThrow(() -> new RestconfDocumentedException(
+ String.format("Stream with name %s was not found.", streamName),
+ ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT));
- final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
- final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
-
final URI uri = prepareUriByStreamName(uriInfo, streamName);
- registerToListenNotification(
- notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
- notificationListenerAdapter.get().setQueryParams(
+ notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
+ notificationListenerAdapter.setQueryParams(
notificationQueryParams.getStart(),
notificationQueryParams.getStop().orElse(null),
notificationQueryParams.getFilter().orElse(null),
false, notificationQueryParams.isSkipNotificationData());
- notificationListenerAdapter.get().setCloseVars(
- handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+ final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+ notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getSchemaHandler());
final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
- notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier(),
+ notificationListenerAdapter.getSchemaPath().lastNodeIdentifier(),
schemaContext.getNotifications(), notificationQueryParams.getStart(),
- notificationListenerAdapter.get().getOutputType(), uri);
- writeDataToDS(schemaContext,
- notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier().getLocalName(), writeTransaction,
- mapToStreams);
+ notificationListenerAdapter.getOutputType(), uri);
+
+ // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+ final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+ writeDataToDS(writeTransaction, mapToStreams);
submitData(writeTransaction);
- transactionChain.close();
return uri;
}
final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
- final LogicalDatastoreType datastoreType = parseURIEnum(
- LogicalDatastoreType.class,
- mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
- if (datastoreType == null) {
- final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
+
+ final String datastoreParam = mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME);
+ if (isNullOrEmpty(datastoreParam)) {
+ final String message = "Stream name does not contain datastore value (pattern /datastore=)";
LOG.debug(message);
throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
}
- final DataChangeScope scope = parseURIEnum(
- DataChangeScope.class,
- mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
- if (scope == null) {
- final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
+ // FIXME: this is kept only for compatibility, we are not using this parameter
+ if (isNullOrEmpty(mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME))) {
+ final String message = "Stream name does not contain scope value (pattern /scope=)";
LOG.warn(message);
throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
}
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
- Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
- listener.get().setQueryParams(
+ final ListenerAdapter listener = ListenersBroker.getInstance()
+ .getDataChangeListenerFor(streamName)
+ .orElseThrow(() -> new IllegalArgumentException("Listener does not exist : " + streamName));
+
+ listener.setQueryParams(
notificationQueryParams.getStart(),
notificationQueryParams.getStop().orElse(null),
notificationQueryParams.getFilter().orElse(null),
false, notificationQueryParams.isSkipNotificationData());
- listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
- registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
+
+ final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+ final SchemaContextHandler schemaHandler = handlersHolder.getSchemaHandler();
+ listener.setCloseVars(dataBroker, schemaHandler);
+ listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
final URI uri = prepareUriByStreamName(uriInfo, streamName);
- final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
- final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
- final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
- final String serializedPath = IdentifierCodec.serialize(listener.get().getPath(), schemaContext);
+ final EffectiveModelContext schemaContext = schemaHandler.get();
+ final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
final MapEntryNode mapToStreams =
- RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
- notificationQueryParams.getStart(), listener.get().getOutputType(), uri, schemaContext, serializedPath);
- writeDataToDS(schemaContext, serializedPath, writeTransaction, mapToStreams);
+ RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
+ notificationQueryParams.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+ final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+ writeDataToDS(writeTransaction, mapToStreams);
submitData(writeTransaction);
- transactionChain.close();
return uri;
}
- private static void writeDataToDS(final EffectiveModelContext schemaContext, final String name,
- final DOMDataTreeReadWriteTransaction readWriteTransaction, final MapEntryNode mapToStreams) {
- readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
- // FIXME: do not use IdentifierCodec here
- IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name, schemaContext),
+ // FIXME: callers are utter duplicates, refactor them
+ private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
+ // FIXME: use put() here
+ tx.merge(LogicalDatastoreType.OPERATIONAL, Rfc8040.restconfStateStreamPath(mapToStreams.getIdentifier()),
mapToStreams);
}
- private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
+ private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
try {
readWriteTransaction.commit().get();
} catch (final InterruptedException | ExecutionException e) {
}
return result;
}
-
- /**
- * Register data change listener in DOM data broker and set it to listener on stream.
- *
- * @param datastore {@link LogicalDatastoreType}
- * @param listener listener on specific stream
- * @param domDataBroker data broker for register data change listener
- */
- private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
- final DOMDataBroker domDataBroker) {
- if (listener.isListening()) {
- return;
- }
-
- final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
- .getInstance(DOMDataTreeChangeService.class);
- if (changeService == null) {
- throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
- }
-
- final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
- final ListenerRegistration<ListenerAdapter> registration =
- changeService.registerDataTreeChangeListener(root, listener);
- listener.setRegistration(registration);
- }
-
- // FIXME: this method should be in NotificationListenerAdapter
- private static void registerToListenNotification(final NotificationListenerAdapter listener,
- final DOMNotificationService notificationService) {
- if (listener.isListening()) {
- return;
- }
-
- final Absolute path = listener.getSchemaPath();
- final ListenerRegistration<DOMNotificationListener> registration =
- notificationService.registerNotificationListener(listener, path);
- listener.setRegistration(registration);
- }
-
- /**
- * Parse out enumeration from URI.
- *
- * @param clazz Target enumeration type.
- * @param value String representation of enumeration value.
- * @return Parsed enumeration type.
- */
- private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
- if (value == null || value.equals("")) {
- return null;
- }
- return ResolveEnumUtil.resolveEnum(clazz, value);
- }
}