2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
10 import static com.google.common.base.Strings.isNullOrEmpty;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.base.Splitter;
15 import java.util.HashMap;
17 import java.util.concurrent.ExecutionException;
18 import javax.ws.rs.core.UriBuilder;
19 import javax.ws.rs.core.UriInfo;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
22 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
23 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
25 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
26 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
27 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
28 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
29 import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
30 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
31 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
32 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
33 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
34 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
38 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Subscribe to stream util class.
45 public abstract class SubscribeToStreamUtil {
47 * Implementation of SubscribeToStreamUtil for Server-sent events.
49 private static final class ServerSentEvents extends SubscribeToStreamUtil {
50 ServerSentEvents(final ListenersBroker listenersBroker) {
51 super(listenersBroker);
55 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
56 return uriInfo.getBaseUriBuilder()
57 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
63 * Implementation of SubscribeToStreamUtil for Web sockets.
65 private static final class WebSockets extends SubscribeToStreamUtil {
66 WebSockets(final ListenersBroker listenersBroker) {
67 super(listenersBroker);
71 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
72 final String scheme = uriInfo.getAbsolutePath().getScheme();
73 final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
76 // Secured HTTP goes to Secured WebSockets
77 uriBuilder.scheme("wss");
81 // Unsecured HTTP and others go to unsecured WebSockets
82 uriBuilder.scheme("ws");
84 return uriBuilder.replacePath(URLConstants.BASE_PATH + '/' + streamName).build();
88 private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
89 private static final Splitter SLASH_SPLITTER = Splitter.on('/');
91 private final @NonNull ListenersBroker listenersBroker;
93 private SubscribeToStreamUtil(final ListenersBroker listenersBroker) {
94 this.listenersBroker = requireNonNull(listenersBroker);
97 public static @NonNull SubscribeToStreamUtil serverSentEvents(final ListenersBroker listenersBroker) {
98 return new ServerSentEvents(listenersBroker);
101 public static @NonNull SubscribeToStreamUtil webSockets(final ListenersBroker listenersBroker) {
102 return new WebSockets(listenersBroker);
105 public final @NonNull ListenersBroker listenersBroker() {
106 return listenersBroker;
110 * Prepare URL from base name and stream name.
112 * @param uriInfo base URL information
113 * @param streamName name of stream for create
116 abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
119 * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
120 * about listener to DS according to ietf-restconf-monitoring.
122 * @param identifier Name of the stream.
123 * @param uriInfo URI information.
124 * @param notificationQueryParams Query parameters of notification.
125 * @param handlersHolder Holder of handlers for notifications.
126 * @return Stream location for listening.
128 final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
129 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
130 final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
131 if (isNullOrEmpty(streamName)) {
132 throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
135 final var notificationListenerAdapter = listenersBroker.notificationListenerFor(streamName);
136 if (notificationListenerAdapter == null) {
137 throw new RestconfDocumentedException(String.format("Stream with name %s was not found.", streamName),
138 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
141 final URI uri = prepareUriByStreamName(uriInfo, streamName);
142 notificationListenerAdapter.setQueryParams(notificationQueryParams);
143 notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
144 final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
145 notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getDatabindProvider());
146 final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
147 notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
148 notificationListenerAdapter.getOutputType(), uri);
150 // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
151 final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
152 writeDataToDS(writeTransaction, mapToStreams);
153 submitData(writeTransaction);
158 * Register listener by streamName in identifier to listen to data change notifications, and put or delete
159 * information about listener to DS according to ietf-restconf-monitoring.
161 * @param identifier Identifier as stream name.
162 * @param uriInfo Base URI information.
163 * @param notificationQueryParams Query parameters of notification.
164 * @param handlersHolder Holder of handlers for notifications.
165 * @return Location for listening.
167 final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
168 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
169 final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
171 final String datastoreParam = mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME);
172 if (isNullOrEmpty(datastoreParam)) {
173 final String message = "Stream name does not contain datastore value (pattern /datastore=)";
175 throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
178 // FIXME: this is kept only for compatibility, we are not using this parameter
179 if (isNullOrEmpty(mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME))) {
180 final String message = "Stream name does not contain scope value (pattern /scope=)";
182 throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
185 final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
186 final ListenerAdapter listener = listenersBroker.dataChangeListenerFor(streamName);
187 if (listener == null) {
188 throw new RestconfDocumentedException("No listener found for stream " + streamName,
189 ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
192 listener.setQueryParams(notificationQueryParams);
194 final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
195 final DatabindProvider schemaHandler = handlersHolder.getDatabindProvider();
196 listener.setCloseVars(dataBroker, schemaHandler);
197 listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
199 final URI uri = prepareUriByStreamName(uriInfo, streamName);
200 final EffectiveModelContext schemaContext = schemaHandler.currentContext().modelContext();
201 final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
203 final MapEntryNode mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
204 listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
205 final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
206 writeDataToDS(writeTransaction, mapToStreams);
207 submitData(writeTransaction);
211 // FIXME: callers are utter duplicates, refactor them
212 private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
213 // FIXME: use put() here
214 tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
218 private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
220 readWriteTransaction.commit().get();
221 } catch (final InterruptedException | ExecutionException e) {
222 throw new RestconfDocumentedException("Problem while putting data to DS.", e);
227 * Prepare map of URI parameter-values.
229 * @param identifier String identification of URI.
230 * @return Map od URI parameters and values.
232 private static Map<String, String> mapValuesFromUri(final String identifier) {
233 final var result = new HashMap<String, String>();
234 for (final String token : SLASH_SPLITTER.split(identifier)) {
235 final String[] paramToken = token.split("=");
236 if (paramToken.length == 2) {
237 result.put(paramToken[0], paramToken[1]);