import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
*/
abstract class SubscribeToStreamUtil {
/**
- * Implementation of {@link UrlResolver} for Server-sent events.
+ * Implementation of SubscribeToStreamUtil for Server-sent events.
*/
private static final class ServerSentEvents extends SubscribeToStreamUtil {
static final ServerSentEvents INSTANCE = new ServerSentEvents();
}
/**
- * Implementation of {@link UrlResolver} for Web sockets.
+ * Implementation of SubscribeToStreamUtil for Web sockets.
*/
private static final class WebSockets extends SubscribeToStreamUtil {
static final WebSockets INSTANCE = new WebSockets();
* @param uriInfo URI information.
* @param notificationQueryParams Query parameters of notification.
* @param handlersHolder Holder of handlers for notifications.
- * @param urlResolver Resolver for proper implementation. Possibilities is WS or SSE.
* @return Stream location for listening.
*/
final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
final Optional<NotificationListenerAdapter> notificationListenerAdapter =
ListenersBroker.getInstance().getNotificationListenerFor(streamName);
- if (!notificationListenerAdapter.isPresent()) {
+ if (notificationListenerAdapter.isEmpty()) {
throw new RestconfDocumentedException(String.format(
"Stream with name %s was not found.", streamName),
ErrorType.PROTOCOL,
final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
- final boolean exist = checkExist(schemaContext, writeTransaction);
final URI uri = prepareUriByStreamName(uriInfo, streamName);
registerToListenNotification(
false, notificationQueryParams.isSkipNotificationData());
notificationListenerAdapter.get().setCloseVars(
handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
- final NormalizedNode<?, ?> mapToStreams =
- RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+ final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier(),
schemaContext.getNotifications(), notificationQueryParams.getStart(),
- notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext), exist);
+ notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext));
writeDataToDS(schemaContext,
- notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier().getLocalName(), writeTransaction,
- exist, mapToStreams);
+ notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier().getLocalName(), writeTransaction,
+ mapToStreams);
submitData(writeTransaction);
transactionChain.close();
return uri;
final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
- final boolean exist = checkExist(schemaContext, writeTransaction);
+ final String serializedPath = IdentifierCodec.serialize(listener.get().getPath(), schemaContext);
- final NormalizedNode<?, ?> mapToStreams = RestconfMappingNodeUtil
- .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
- notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
- getMonitoringModule(schemaContext), exist, schemaContext);
- writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
- writeTransaction, exist, mapToStreams);
+ final MapEntryNode mapToStreams =
+ RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
+ notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
+ getMonitoringModule(schemaContext), schemaContext, serializedPath);
+ writeDataToDS(schemaContext, serializedPath, writeTransaction, mapToStreams);
submitData(writeTransaction);
transactionChain.close();
return uri;
}
private static void writeDataToDS(final EffectiveModelContext schemaContext, final String name,
- final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
- final NormalizedNode<?, ?> mapToStreams) {
- String pathId;
- if (exist) {
- pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
- } else {
- pathId = MonitoringModule.PATH_TO_STREAMS;
- }
+ final DOMDataTreeReadWriteTransaction readWriteTransaction, final MapEntryNode mapToStreams) {
readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
- IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams);
+ // FIXME: do not use IdentifierCodec here
+ IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name, schemaContext),
+ mapToStreams);
}
private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
listener.setRegistration(registration);
}
- private static boolean checkExist(final EffectiveModelContext schemaContext,
- final DOMDataTreeReadOperations readWriteTransaction) {
- try {
- return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
- IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get();
- } catch (final InterruptedException | ExecutionException exception) {
- throw new RestconfDocumentedException("Problem while checking data if exists", exception);
- }
- }
-
private static void registerToListenNotification(final NotificationListenerAdapter listener,
final NotificationServiceHandler notificationServiceHandler) {
if (listener.isListening()) {