import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
-import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
-import org.opendaylight.mdsal.dom.api.DOMNotificationService;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
+import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
import org.opendaylight.restconf.nb.rfc8040.Rfc8040;
+import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
String.format("Stream with name %s was not found.", streamName),
ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT));
- final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
- final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
-
final URI uri = prepareUriByStreamName(uriInfo, streamName);
- registerToListenNotification(notificationListenerAdapter, handlersHolder.getNotificationServiceHandler());
- notificationListenerAdapter.setQueryParams(
- notificationQueryParams.getStart(),
- notificationQueryParams.getStop().orElse(null),
- notificationQueryParams.getFilter().orElse(null),
- false, notificationQueryParams.isSkipNotificationData());
- notificationListenerAdapter.setCloseVars(
- handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+ notificationListenerAdapter.setQueryParams(notificationQueryParams);
+ notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
+ final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+ notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getSchemaHandler());
final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
notificationListenerAdapter.getSchemaPath().lastNodeIdentifier(),
- schemaContext.getNotifications(), notificationQueryParams.getStart(),
+ schemaContext.getNotifications(), notificationListenerAdapter.getStart(),
notificationListenerAdapter.getOutputType(), uri);
+
+ // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+ final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
writeDataToDS(writeTransaction, mapToStreams);
submitData(writeTransaction);
- transactionChain.close();
return uri;
}
}
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
+ final ListenerAdapter listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName)
+ .orElseThrow(() -> new RestconfDocumentedException("No listener found for stream " + streamName,
+ ErrorType.APPLICATION, ErrorTag.DATA_MISSING));
+ listener.setQueryParams(notificationQueryParams);
- final ListenerAdapter listener = ListenersBroker.getInstance()
- .getDataChangeListenerFor(streamName)
- .orElseThrow(() -> new IllegalArgumentException("Listener does not exist : " + streamName));
-
- listener.setQueryParams(
- notificationQueryParams.getStart(),
- notificationQueryParams.getStop().orElse(null),
- notificationQueryParams.getFilter().orElse(null),
- false, notificationQueryParams.isSkipNotificationData());
- listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
-
- final LogicalDatastoreType datastoreType = LogicalDatastoreType.valueOf(datastoreParam);
- registration(datastoreType, listener, handlersHolder.getDataBroker());
+ final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
+ final SchemaContextHandler schemaHandler = handlersHolder.getSchemaHandler();
+ listener.setCloseVars(dataBroker, schemaHandler);
+ listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
final URI uri = prepareUriByStreamName(uriInfo, streamName);
- final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
- final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
- final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
+ final EffectiveModelContext schemaContext = schemaHandler.get();
final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
final MapEntryNode mapToStreams =
RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
- notificationQueryParams.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+ listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+ final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
writeDataToDS(writeTransaction, mapToStreams);
submitData(writeTransaction);
- transactionChain.close();
return uri;
}
mapToStreams);
}
- private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
+ private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
try {
readWriteTransaction.commit().get();
} catch (final InterruptedException | ExecutionException e) {
}
return result;
}
-
- /**
- * Register data change listener in DOM data broker and set it to listener on stream.
- *
- * @param datastore {@link LogicalDatastoreType}
- * @param listener listener on specific stream
- * @param domDataBroker data broker for register data change listener
- */
- private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
- final DOMDataBroker domDataBroker) {
- if (listener.isListening()) {
- return;
- }
-
- final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
- .getInstance(DOMDataTreeChangeService.class);
- if (changeService == null) {
- throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
- }
-
- final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
- final ListenerRegistration<ListenerAdapter> registration =
- changeService.registerDataTreeChangeListener(root, listener);
- listener.setRegistration(registration);
- }
-
- // FIXME: this method should be in NotificationListenerAdapter
- private static void registerToListenNotification(final NotificationListenerAdapter listener,
- final DOMNotificationService notificationService) {
- if (listener.isListening()) {
- return;
- }
-
- final Absolute path = listener.getSchemaPath();
- final ListenerRegistration<DOMNotificationListener> registration =
- notificationService.registerNotificationListener(listener, path);
- listener.setRegistration(registration);
- }
}