b95059beb5e3efb4198361ffab02c8df0050fe72
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / SubscribeToStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import static com.google.common.base.Strings.isNullOrEmpty;
11
12 import java.net.URI;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.concurrent.ExecutionException;
16 import javax.ws.rs.core.UriBuilder;
17 import javax.ws.rs.core.UriInfo;
18 import org.eclipse.jdt.annotation.NonNull;
19 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
20 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
23 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
24 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
25 import org.opendaylight.restconf.nb.rfc8040.Rfc8040;
26 import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
27 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
28 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
29 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
30 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
31 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
32 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
33 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
34 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
38 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Subscribe to stream util class.
44  */
45 abstract class SubscribeToStreamUtil {
46     /**
47      * Implementation of SubscribeToStreamUtil for Server-sent events.
48      */
49     private static final class ServerSentEvents extends SubscribeToStreamUtil {
50         static final ServerSentEvents INSTANCE = new ServerSentEvents();
51
52         @Override
53         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
54             final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
55             return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/'
56                     + RestconfConstants.NOTIF + '/' + streamName).build();
57         }
58     }
59
60     /**
61      * Implementation of SubscribeToStreamUtil for Web sockets.
62      */
63     private static final class WebSockets extends SubscribeToStreamUtil {
64         static final WebSockets INSTANCE = new WebSockets();
65
66         @Override
67         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
68             final String scheme = uriInfo.getAbsolutePath().getScheme();
69             final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
70             switch (scheme) {
71                 case "https":
72                     // Secured HTTP goes to Secured WebSockets
73                     uriBuilder.scheme("wss");
74                     break;
75                 case "http":
76                 default:
77                     // Unsecured HTTP and others go to unsecured WebSockets
78                     uriBuilder.scheme("ws");
79             }
80             return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/' + streamName).build();
81         }
82     }
83
84
85     private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
86
87     SubscribeToStreamUtil() {
88         // Hidden on purpose
89     }
90
91     static SubscribeToStreamUtil serverSentEvents() {
92         return ServerSentEvents.INSTANCE;
93     }
94
95     static SubscribeToStreamUtil webSockets() {
96         return WebSockets.INSTANCE;
97     }
98
99     /**
100      * Prepare URL from base name and stream name.
101      *
102      * @param uriInfo base URL information
103      * @param streamName name of stream for create
104      * @return final URL
105      */
106     abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
107
108     /**
109      * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
110      * about listener to DS according to ietf-restconf-monitoring.
111      *
112      * @param identifier              Name of the stream.
113      * @param uriInfo                 URI information.
114      * @param notificationQueryParams Query parameters of notification.
115      * @param handlersHolder          Holder of handlers for notifications.
116      * @return Stream location for listening.
117      */
118     final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
119             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
120         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
121         if (isNullOrEmpty(streamName)) {
122             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
123         }
124
125         final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance()
126             .getNotificationListenerFor(streamName)
127             .orElseThrow(() -> new RestconfDocumentedException(
128                 String.format("Stream with name %s was not found.", streamName),
129                 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT));
130
131         final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
132         final URI uri = prepareUriByStreamName(uriInfo, streamName);
133         notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
134         notificationListenerAdapter.setQueryParams(
135                 notificationQueryParams.startTime(),
136                 notificationQueryParams.stopTime(),
137                 notificationQueryParams.filter(),
138                 notificationQueryParams.leafNodesOnly(),
139                 notificationQueryParams.skipNotificationData());
140         final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
141         notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getSchemaHandler());
142         final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
143                     notificationListenerAdapter.getSchemaPath().lastNodeIdentifier(),
144                     schemaContext.getNotifications(), notificationListenerAdapter.getStart(),
145                     notificationListenerAdapter.getOutputType(), uri);
146
147         // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
148         final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
149         writeDataToDS(writeTransaction, mapToStreams);
150         submitData(writeTransaction);
151         return uri;
152     }
153
154     /**
155      * Register listener by streamName in identifier to listen to data change notifications, and put or delete
156      * information about listener to DS according to ietf-restconf-monitoring.
157      *
158      * @param identifier              Identifier as stream name.
159      * @param uriInfo                 Base URI information.
160      * @param notificationQueryParams Query parameters of notification.
161      * @param handlersHolder          Holder of handlers for notifications.
162      * @return Location for listening.
163      */
164     final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
165             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
166         final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
167
168         final String datastoreParam = mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME);
169         if (isNullOrEmpty(datastoreParam)) {
170             final String message = "Stream name does not contain datastore value (pattern /datastore=)";
171             LOG.debug(message);
172             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
173         }
174
175         // FIXME: this is kept only for compatibility, we are not using this parameter
176         if (isNullOrEmpty(mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME))) {
177             final String message = "Stream name does not contain scope value (pattern /scope=)";
178             LOG.warn(message);
179             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
180         }
181
182         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
183         final ListenerAdapter listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName)
184             .orElseThrow(() -> new RestconfDocumentedException("No listener found for stream " + streamName,
185                 ErrorType.APPLICATION, ErrorTag.DATA_MISSING));
186
187         listener.setQueryParams(
188                 notificationQueryParams.startTime(),
189                 notificationQueryParams.stopTime(),
190                 notificationQueryParams.filter(),
191                 notificationQueryParams.leafNodesOnly(),
192                 notificationQueryParams.skipNotificationData());
193
194         final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
195         final SchemaContextHandler schemaHandler = handlersHolder.getSchemaHandler();
196         listener.setCloseVars(dataBroker, schemaHandler);
197         listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
198
199         final URI uri = prepareUriByStreamName(uriInfo, streamName);
200         final EffectiveModelContext schemaContext = schemaHandler.get();
201         final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
202
203         final MapEntryNode mapToStreams =
204             RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
205                 listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
206         final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
207         writeDataToDS(writeTransaction, mapToStreams);
208         submitData(writeTransaction);
209         return uri;
210     }
211
212     // FIXME: callers are utter duplicates, refactor them
213     private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
214         // FIXME: use put() here
215         tx.merge(LogicalDatastoreType.OPERATIONAL, Rfc8040.restconfStateStreamPath(mapToStreams.getIdentifier()),
216             mapToStreams);
217     }
218
219     private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
220         try {
221             readWriteTransaction.commit().get();
222         } catch (final InterruptedException | ExecutionException e) {
223             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
224         }
225     }
226
227     /**
228      * Prepare map of URI parameter-values.
229      *
230      * @param identifier String identification of URI.
231      * @return Map od URI parameters and values.
232      */
233     private static Map<String, String> mapValuesFromUri(final String identifier) {
234         final HashMap<String, String> result = new HashMap<>();
235         for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
236             final String[] paramToken = token.split("=");
237             if (paramToken.length == 2) {
238                 result.put(paramToken[0], paramToken[1]);
239             }
240         }
241         return result;
242     }
243 }