2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Strings;
13 import java.util.HashMap;
15 import java.util.Optional;
16 import java.util.concurrent.ExecutionException;
17 import javax.ws.rs.core.UriBuilder;
18 import javax.ws.rs.core.UriInfo;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
21 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
23 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
25 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
26 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
27 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
28 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
29 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
30 import org.opendaylight.restconf.common.util.DataChangeScope;
31 import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule;
32 import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler;
33 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
34 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
35 import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil;
36 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
37 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
38 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
39 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
40 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
41 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
42 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
45 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
46 import org.opendaylight.yangtools.yang.model.api.Module;
47 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
52 * Subscribe to stream util class.
54 abstract class SubscribeToStreamUtil {
56 * Implementation of {@link UrlResolver} for Server-sent events.
58 private static final class ServerSentEvents extends SubscribeToStreamUtil {
59 static final ServerSentEvents INSTANCE = new ServerSentEvents();
62 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
63 final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
64 return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/'
65 + RestconfConstants.NOTIF + '/' + streamName).build();
70 * Implementation of {@link UrlResolver} for Web sockets.
72 private static final class WebSockets extends SubscribeToStreamUtil {
73 static final WebSockets INSTANCE = new WebSockets();
76 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
77 final String scheme = uriInfo.getAbsolutePath().getScheme();
78 final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
81 // Secured HTTP goes to Secured WebSockets
82 uriBuilder.scheme("wss");
86 // Unsecured HTTP and others go to unsecured WebSockets
87 uriBuilder.scheme("ws");
89 return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/' + streamName).build();
94 private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
96 SubscribeToStreamUtil() {
100 static SubscribeToStreamUtil serverSentEvents() {
101 return ServerSentEvents.INSTANCE;
104 static SubscribeToStreamUtil webSockets() {
105 return WebSockets.INSTANCE;
109 * Prepare URL from base name and stream name.
111 * @param uriInfo base URL information
112 * @param streamName name of stream for create
115 abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
118 * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
119 * about listener to DS according to ietf-restconf-monitoring.
121 * @param identifier Name of the stream.
122 * @param uriInfo URI information.
123 * @param notificationQueryParams Query parameters of notification.
124 * @param handlersHolder Holder of handlers for notifications.
125 * @param urlResolver Resolver for proper implementation. Possibilities is WS or SSE.
126 * @return Stream location for listening.
128 final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
129 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
130 final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
131 if (Strings.isNullOrEmpty(streamName)) {
132 throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
134 final Optional<NotificationListenerAdapter> notificationListenerAdapter =
135 ListenersBroker.getInstance().getNotificationListenerFor(streamName);
137 if (!notificationListenerAdapter.isPresent()) {
138 throw new RestconfDocumentedException(String.format(
139 "Stream with name %s was not found.", streamName),
141 ErrorTag.UNKNOWN_ELEMENT);
144 final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
145 final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
146 final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
148 final URI uri = prepareUriByStreamName(uriInfo, streamName);
149 registerToListenNotification(
150 notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
151 notificationListenerAdapter.get().setQueryParams(
152 notificationQueryParams.getStart(),
153 notificationQueryParams.getStop().orElse(null),
154 notificationQueryParams.getFilter().orElse(null),
155 false, notificationQueryParams.isSkipNotificationData());
156 notificationListenerAdapter.get().setCloseVars(
157 handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
158 final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
159 notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier(),
160 schemaContext.getNotifications(), notificationQueryParams.getStart(),
161 notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext));
162 writeDataToDS(schemaContext,
163 notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier().getLocalName(), writeTransaction,
165 submitData(writeTransaction);
166 transactionChain.close();
171 * Register listener by streamName in identifier to listen to data change notifications, and put or delete
172 * information about listener to DS according to ietf-restconf-monitoring.
174 * @param identifier Identifier as stream name.
175 * @param uriInfo Base URI information.
176 * @param notificationQueryParams Query parameters of notification.
177 * @param handlersHolder Holder of handlers for notifications.
178 * @return Location for listening.
180 final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
181 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
182 final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
183 final LogicalDatastoreType datastoreType = parseURIEnum(
184 LogicalDatastoreType.class,
185 mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
186 if (datastoreType == null) {
187 final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
189 throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
192 final DataChangeScope scope = parseURIEnum(
193 DataChangeScope.class,
194 mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
196 final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
198 throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
201 final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
202 final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
203 Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
205 listener.get().setQueryParams(
206 notificationQueryParams.getStart(),
207 notificationQueryParams.getStop().orElse(null),
208 notificationQueryParams.getFilter().orElse(null),
209 false, notificationQueryParams.isSkipNotificationData());
210 listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
211 registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
213 final URI uri = prepareUriByStreamName(uriInfo, streamName);
214 final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
215 final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
216 final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
218 final MapEntryNode mapToStreams =
219 RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
220 notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
221 getMonitoringModule(schemaContext), schemaContext);
222 writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
223 writeTransaction, mapToStreams);
224 submitData(writeTransaction);
225 transactionChain.close();
229 static Module getMonitoringModule(final EffectiveModelContext schemaContext) {
230 return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null);
233 private static void writeDataToDS(final EffectiveModelContext schemaContext, final String name,
234 final DOMDataTreeReadWriteTransaction readWriteTransaction, final MapEntryNode mapToStreams) {
235 readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
236 // FIXME: do not use IdentifierCodec here
237 IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name, schemaContext),
241 private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
243 readWriteTransaction.commit().get();
244 } catch (final InterruptedException | ExecutionException e) {
245 throw new RestconfDocumentedException("Problem while putting data to DS.", e);
250 * Prepare map of URI parameter-values.
252 * @param identifier String identification of URI.
253 * @return Map od URI parameters and values.
255 private static Map<String, String> mapValuesFromUri(final String identifier) {
256 final HashMap<String, String> result = new HashMap<>();
257 for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
258 final String[] paramToken = token.split("=");
259 if (paramToken.length == 2) {
260 result.put(paramToken[0], paramToken[1]);
267 * Register data change listener in DOM data broker and set it to listener on stream.
269 * @param datastore {@link LogicalDatastoreType}
270 * @param listener listener on specific stream
271 * @param domDataBroker data broker for register data change listener
273 private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
274 final DOMDataBroker domDataBroker) {
275 if (listener.isListening()) {
279 final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
280 .getInstance(DOMDataTreeChangeService.class);
281 if (changeService == null) {
282 throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
285 final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
286 final ListenerRegistration<ListenerAdapter> registration =
287 changeService.registerDataTreeChangeListener(root, listener);
288 listener.setRegistration(registration);
291 private static void registerToListenNotification(final NotificationListenerAdapter listener,
292 final NotificationServiceHandler notificationServiceHandler) {
293 if (listener.isListening()) {
297 final Absolute path = listener.getSchemaPath();
298 final ListenerRegistration<DOMNotificationListener> registration =
299 notificationServiceHandler.get().registerNotificationListener(listener, path);
300 listener.setRegistration(registration);
304 * Parse out enumeration from URI.
306 * @param clazz Target enumeration type.
307 * @param value String representation of enumeration value.
308 * @return Parsed enumeration type.
310 private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
311 if (value == null || value.equals("")) {
314 return ResolveEnumUtil.resolveEnum(clazz, value);