ee96e3e36f6152c1773a657d436e1b1b80c6b37f
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / SubscribeToStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Strings;
12 import java.net.URI;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Optional;
16 import java.util.concurrent.ExecutionException;
17 import javax.ws.rs.core.UriBuilder;
18 import javax.ws.rs.core.UriInfo;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
21 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
23 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
25 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
26 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
27 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
28 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
29 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
30 import org.opendaylight.restconf.common.util.DataChangeScope;
31 import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule;
32 import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler;
33 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
34 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
35 import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil;
36 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
37 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
38 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
39 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
40 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
41 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
42 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
45 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
46 import org.opendaylight.yangtools.yang.model.api.Module;
47 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  * Subscribe to stream util class.
53  */
54 abstract class SubscribeToStreamUtil {
55     /**
56      * Implementation of {@link UrlResolver} for Server-sent events.
57      */
58     private static final class ServerSentEvents extends SubscribeToStreamUtil {
59         static final ServerSentEvents INSTANCE = new ServerSentEvents();
60
61         @Override
62         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
63             final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
64             return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/'
65                     + RestconfConstants.NOTIF + '/' + streamName).build();
66         }
67     }
68
69     /**
70      * Implementation of {@link UrlResolver} for Web sockets.
71      */
72     private static final class WebSockets extends SubscribeToStreamUtil {
73         static final WebSockets INSTANCE = new WebSockets();
74
75         @Override
76         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
77             final String scheme = uriInfo.getAbsolutePath().getScheme();
78             final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
79             switch (scheme) {
80                 case "https":
81                     // Secured HTTP goes to Secured WebSockets
82                     uriBuilder.scheme("wss");
83                     break;
84                 case "http":
85                 default:
86                     // Unsecured HTTP and others go to unsecured WebSockets
87                     uriBuilder.scheme("ws");
88             }
89             return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/' + streamName).build();
90         }
91     }
92
93
94     private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
95
96     SubscribeToStreamUtil() {
97         // Hidden on purpose
98     }
99
100     static SubscribeToStreamUtil serverSentEvents() {
101         return ServerSentEvents.INSTANCE;
102     }
103
104     static SubscribeToStreamUtil webSockets() {
105         return WebSockets.INSTANCE;
106     }
107
108     /**
109      * Prepare URL from base name and stream name.
110      *
111      * @param uriInfo base URL information
112      * @param streamName name of stream for create
113      * @return final URL
114      */
115     abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
116
117     /**
118      * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
119      * about listener to DS according to ietf-restconf-monitoring.
120      *
121      * @param identifier              Name of the stream.
122      * @param uriInfo                 URI information.
123      * @param notificationQueryParams Query parameters of notification.
124      * @param handlersHolder          Holder of handlers for notifications.
125      * @param urlResolver             Resolver for proper implementation. Possibilities is WS or SSE.
126      * @return Stream location for listening.
127      */
128     final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
129             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
130         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
131         if (Strings.isNullOrEmpty(streamName)) {
132             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
133         }
134         final Optional<NotificationListenerAdapter> notificationListenerAdapter =
135                 ListenersBroker.getInstance().getNotificationListenerFor(streamName);
136
137         if (!notificationListenerAdapter.isPresent()) {
138             throw new RestconfDocumentedException(String.format(
139                     "Stream with name %s was not found.", streamName),
140                     ErrorType.PROTOCOL,
141                     ErrorTag.UNKNOWN_ELEMENT);
142         }
143
144         final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
145         final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
146         final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
147
148         final URI uri = prepareUriByStreamName(uriInfo, streamName);
149         registerToListenNotification(
150                 notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
151         notificationListenerAdapter.get().setQueryParams(
152                 notificationQueryParams.getStart(),
153                 notificationQueryParams.getStop().orElse(null),
154                 notificationQueryParams.getFilter().orElse(null),
155                 false, notificationQueryParams.isSkipNotificationData());
156         notificationListenerAdapter.get().setCloseVars(
157                 handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
158         final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
159                     notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier(),
160                     schemaContext.getNotifications(), notificationQueryParams.getStart(),
161                     notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext));
162         writeDataToDS(schemaContext,
163             notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier().getLocalName(), writeTransaction,
164             mapToStreams);
165         submitData(writeTransaction);
166         transactionChain.close();
167         return uri;
168     }
169
170     /**
171      * Register listener by streamName in identifier to listen to data change notifications, and put or delete
172      * information about listener to DS according to ietf-restconf-monitoring.
173      *
174      * @param identifier              Identifier as stream name.
175      * @param uriInfo                 Base URI information.
176      * @param notificationQueryParams Query parameters of notification.
177      * @param handlersHolder          Holder of handlers for notifications.
178      * @return Location for listening.
179      */
180     final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
181             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
182         final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
183         final LogicalDatastoreType datastoreType = parseURIEnum(
184                 LogicalDatastoreType.class,
185                 mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
186         if (datastoreType == null) {
187             final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
188             LOG.debug(message);
189             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
190         }
191
192         final DataChangeScope scope = parseURIEnum(
193                 DataChangeScope.class,
194                 mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
195         if (scope == null) {
196             final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
197             LOG.warn(message);
198             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
199         }
200
201         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
202         final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
203         Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
204
205         listener.get().setQueryParams(
206                 notificationQueryParams.getStart(),
207                 notificationQueryParams.getStop().orElse(null),
208                 notificationQueryParams.getFilter().orElse(null),
209                 false, notificationQueryParams.isSkipNotificationData());
210         listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
211         registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
212
213         final URI uri = prepareUriByStreamName(uriInfo, streamName);
214         final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
215         final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
216         final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
217
218         final MapEntryNode mapToStreams =
219             RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
220                 notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
221                 getMonitoringModule(schemaContext), schemaContext);
222         writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
223                 writeTransaction, mapToStreams);
224         submitData(writeTransaction);
225         transactionChain.close();
226         return uri;
227     }
228
229     static Module getMonitoringModule(final EffectiveModelContext schemaContext) {
230         return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null);
231     }
232
233     private static void writeDataToDS(final EffectiveModelContext schemaContext, final String name,
234             final DOMDataTreeReadWriteTransaction readWriteTransaction, final MapEntryNode mapToStreams) {
235         readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
236             // FIXME: do not use IdentifierCodec here
237             IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name, schemaContext),
238             mapToStreams);
239     }
240
241     private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
242         try {
243             readWriteTransaction.commit().get();
244         } catch (final InterruptedException | ExecutionException e) {
245             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
246         }
247     }
248
249     /**
250      * Prepare map of URI parameter-values.
251      *
252      * @param identifier String identification of URI.
253      * @return Map od URI parameters and values.
254      */
255     private static Map<String, String> mapValuesFromUri(final String identifier) {
256         final HashMap<String, String> result = new HashMap<>();
257         for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
258             final String[] paramToken = token.split("=");
259             if (paramToken.length == 2) {
260                 result.put(paramToken[0], paramToken[1]);
261             }
262         }
263         return result;
264     }
265
266     /**
267      * Register data change listener in DOM data broker and set it to listener on stream.
268      *
269      * @param datastore     {@link LogicalDatastoreType}
270      * @param listener      listener on specific stream
271      * @param domDataBroker data broker for register data change listener
272      */
273     private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
274             final DOMDataBroker domDataBroker) {
275         if (listener.isListening()) {
276             return;
277         }
278
279         final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
280                 .getInstance(DOMDataTreeChangeService.class);
281         if (changeService == null) {
282             throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
283         }
284
285         final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
286         final ListenerRegistration<ListenerAdapter> registration =
287                 changeService.registerDataTreeChangeListener(root, listener);
288         listener.setRegistration(registration);
289     }
290
291     private static void registerToListenNotification(final NotificationListenerAdapter listener,
292             final NotificationServiceHandler notificationServiceHandler) {
293         if (listener.isListening()) {
294             return;
295         }
296
297         final Absolute path = listener.getSchemaPath();
298         final ListenerRegistration<DOMNotificationListener> registration =
299                 notificationServiceHandler.get().registerNotificationListener(listener, path);
300         listener.setRegistration(registration);
301     }
302
303     /**
304      * Parse out enumeration from URI.
305      *
306      * @param clazz Target enumeration type.
307      * @param value String representation of enumeration value.
308      * @return Parsed enumeration type.
309      */
310     private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
311         if (value == null || value.equals("")) {
312             return null;
313         }
314         return ResolveEnumUtil.resolveEnum(clazz, value);
315     }
316 }