2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
10 import static com.google.common.base.Strings.isNullOrEmpty;
13 import java.util.HashMap;
15 import java.util.concurrent.ExecutionException;
16 import javax.ws.rs.core.UriBuilder;
17 import javax.ws.rs.core.UriInfo;
18 import org.eclipse.jdt.annotation.NonNull;
19 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
20 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
23 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
24 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
25 import org.opendaylight.restconf.nb.rfc8040.Rfc8040;
26 import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
27 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
28 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
29 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
30 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
31 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
32 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
33 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
34 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
38 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Subscribe to stream util class.
45 abstract class SubscribeToStreamUtil {
47 * Implementation of SubscribeToStreamUtil for Server-sent events.
49 private static final class ServerSentEvents extends SubscribeToStreamUtil {
50 static final ServerSentEvents INSTANCE = new ServerSentEvents();
53 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
54 final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
55 return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/'
56 + RestconfConstants.NOTIF + '/' + streamName).build();
61 * Implementation of SubscribeToStreamUtil for Web sockets.
63 private static final class WebSockets extends SubscribeToStreamUtil {
64 static final WebSockets INSTANCE = new WebSockets();
67 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
68 final String scheme = uriInfo.getAbsolutePath().getScheme();
69 final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
72 // Secured HTTP goes to Secured WebSockets
73 uriBuilder.scheme("wss");
77 // Unsecured HTTP and others go to unsecured WebSockets
78 uriBuilder.scheme("ws");
80 return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/' + streamName).build();
85 private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
87 SubscribeToStreamUtil() {
91 static SubscribeToStreamUtil serverSentEvents() {
92 return ServerSentEvents.INSTANCE;
95 static SubscribeToStreamUtil webSockets() {
96 return WebSockets.INSTANCE;
100 * Prepare URL from base name and stream name.
102 * @param uriInfo base URL information
103 * @param streamName name of stream for create
106 abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
109 * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
110 * about listener to DS according to ietf-restconf-monitoring.
112 * @param identifier Name of the stream.
113 * @param uriInfo URI information.
114 * @param notificationQueryParams Query parameters of notification.
115 * @param handlersHolder Holder of handlers for notifications.
116 * @return Stream location for listening.
118 final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
119 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
120 final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
121 if (isNullOrEmpty(streamName)) {
122 throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
125 final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance()
126 .getNotificationListenerFor(streamName)
127 .orElseThrow(() -> new RestconfDocumentedException(
128 String.format("Stream with name %s was not found.", streamName),
129 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT));
131 final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
132 final URI uri = prepareUriByStreamName(uriInfo, streamName);
133 notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
134 notificationListenerAdapter.setQueryParams(
135 notificationQueryParams.startTime(),
136 notificationQueryParams.stopTime(),
137 notificationQueryParams.filter(),
138 notificationQueryParams.leafNodesOnly(),
139 notificationQueryParams.skipNotificationData());
140 final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
141 notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getSchemaHandler());
142 final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
143 notificationListenerAdapter.getSchemaPath().lastNodeIdentifier(),
144 schemaContext.getNotifications(), notificationListenerAdapter.getStart(),
145 notificationListenerAdapter.getOutputType(), uri);
147 // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
148 final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
149 writeDataToDS(writeTransaction, mapToStreams);
150 submitData(writeTransaction);
155 * Register listener by streamName in identifier to listen to data change notifications, and put or delete
156 * information about listener to DS according to ietf-restconf-monitoring.
158 * @param identifier Identifier as stream name.
159 * @param uriInfo Base URI information.
160 * @param notificationQueryParams Query parameters of notification.
161 * @param handlersHolder Holder of handlers for notifications.
162 * @return Location for listening.
164 final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
165 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
166 final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
168 final String datastoreParam = mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME);
169 if (isNullOrEmpty(datastoreParam)) {
170 final String message = "Stream name does not contain datastore value (pattern /datastore=)";
172 throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
175 // FIXME: this is kept only for compatibility, we are not using this parameter
176 if (isNullOrEmpty(mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME))) {
177 final String message = "Stream name does not contain scope value (pattern /scope=)";
179 throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
182 final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
183 final ListenerAdapter listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName)
184 .orElseThrow(() -> new RestconfDocumentedException("No listener found for stream " + streamName,
185 ErrorType.APPLICATION, ErrorTag.DATA_MISSING));
187 listener.setQueryParams(
188 notificationQueryParams.startTime(),
189 notificationQueryParams.stopTime(),
190 notificationQueryParams.filter(),
191 notificationQueryParams.leafNodesOnly(),
192 notificationQueryParams.skipNotificationData());
194 final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
195 final SchemaContextHandler schemaHandler = handlersHolder.getSchemaHandler();
196 listener.setCloseVars(dataBroker, schemaHandler);
197 listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
199 final URI uri = prepareUriByStreamName(uriInfo, streamName);
200 final EffectiveModelContext schemaContext = schemaHandler.get();
201 final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
203 final MapEntryNode mapToStreams =
204 RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
205 listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
206 final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
207 writeDataToDS(writeTransaction, mapToStreams);
208 submitData(writeTransaction);
212 // FIXME: callers are utter duplicates, refactor them
213 private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
214 // FIXME: use put() here
215 tx.merge(LogicalDatastoreType.OPERATIONAL, Rfc8040.restconfStateStreamPath(mapToStreams.getIdentifier()),
219 private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
221 readWriteTransaction.commit().get();
222 } catch (final InterruptedException | ExecutionException e) {
223 throw new RestconfDocumentedException("Problem while putting data to DS.", e);
228 * Prepare map of URI parameter-values.
230 * @param identifier String identification of URI.
231 * @return Map od URI parameters and values.
233 private static Map<String, String> mapValuesFromUri(final String identifier) {
234 final HashMap<String, String> result = new HashMap<>();
235 for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
236 final String[] paramToken = token.split("=");
237 if (paramToken.length == 2) {
238 result.put(paramToken[0], paramToken[1]);