Remove NotificationServiceHandler
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / SubscribeToStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Strings;
12 import java.net.URI;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Optional;
16 import java.util.concurrent.ExecutionException;
17 import javax.ws.rs.core.UriBuilder;
18 import javax.ws.rs.core.UriInfo;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
21 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
23 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
25 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
26 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
27 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
28 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
29 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
30 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
31 import org.opendaylight.restconf.common.util.DataChangeScope;
32 import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule;
33 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
34 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
35 import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil;
36 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
37 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
38 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
39 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
40 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
41 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
42 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
45 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
46 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Subscribe to stream util class.
52  */
53 abstract class SubscribeToStreamUtil {
54     /**
55      * Implementation of SubscribeToStreamUtil for Server-sent events.
56      */
57     private static final class ServerSentEvents extends SubscribeToStreamUtil {
58         static final ServerSentEvents INSTANCE = new ServerSentEvents();
59
60         @Override
61         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
62             final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
63             return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/'
64                     + RestconfConstants.NOTIF + '/' + streamName).build();
65         }
66     }
67
68     /**
69      * Implementation of SubscribeToStreamUtil for Web sockets.
70      */
71     private static final class WebSockets extends SubscribeToStreamUtil {
72         static final WebSockets INSTANCE = new WebSockets();
73
74         @Override
75         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
76             final String scheme = uriInfo.getAbsolutePath().getScheme();
77             final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
78             switch (scheme) {
79                 case "https":
80                     // Secured HTTP goes to Secured WebSockets
81                     uriBuilder.scheme("wss");
82                     break;
83                 case "http":
84                 default:
85                     // Unsecured HTTP and others go to unsecured WebSockets
86                     uriBuilder.scheme("ws");
87             }
88             return uriBuilder.replacePath(RestconfConstants.BASE_URI_PATTERN + '/' + streamName).build();
89         }
90     }
91
92
93     private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
94
95     SubscribeToStreamUtil() {
96         // Hidden on purpose
97     }
98
99     static SubscribeToStreamUtil serverSentEvents() {
100         return ServerSentEvents.INSTANCE;
101     }
102
103     static SubscribeToStreamUtil webSockets() {
104         return WebSockets.INSTANCE;
105     }
106
107     /**
108      * Prepare URL from base name and stream name.
109      *
110      * @param uriInfo base URL information
111      * @param streamName name of stream for create
112      * @return final URL
113      */
114     abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
115
116     /**
117      * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
118      * about listener to DS according to ietf-restconf-monitoring.
119      *
120      * @param identifier              Name of the stream.
121      * @param uriInfo                 URI information.
122      * @param notificationQueryParams Query parameters of notification.
123      * @param handlersHolder          Holder of handlers for notifications.
124      * @return Stream location for listening.
125      */
126     final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
127             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
128         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
129         if (Strings.isNullOrEmpty(streamName)) {
130             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
131         }
132         final Optional<NotificationListenerAdapter> notificationListenerAdapter =
133                 ListenersBroker.getInstance().getNotificationListenerFor(streamName);
134
135         if (notificationListenerAdapter.isEmpty()) {
136             throw new RestconfDocumentedException(String.format(
137                     "Stream with name %s was not found.", streamName),
138                     ErrorType.PROTOCOL,
139                     ErrorTag.UNKNOWN_ELEMENT);
140         }
141
142         final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
143         final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
144         final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
145
146         final URI uri = prepareUriByStreamName(uriInfo, streamName);
147         registerToListenNotification(
148                 notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
149         notificationListenerAdapter.get().setQueryParams(
150                 notificationQueryParams.getStart(),
151                 notificationQueryParams.getStop().orElse(null),
152                 notificationQueryParams.getFilter().orElse(null),
153                 false, notificationQueryParams.isSkipNotificationData());
154         notificationListenerAdapter.get().setCloseVars(
155                 handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
156         final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
157                     notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier(),
158                     schemaContext.getNotifications(), notificationQueryParams.getStart(),
159                     notificationListenerAdapter.get().getOutputType(), uri);
160         writeDataToDS(schemaContext,
161             notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier().getLocalName(), writeTransaction,
162             mapToStreams);
163         submitData(writeTransaction);
164         transactionChain.close();
165         return uri;
166     }
167
168     /**
169      * Register listener by streamName in identifier to listen to data change notifications, and put or delete
170      * information about listener to DS according to ietf-restconf-monitoring.
171      *
172      * @param identifier              Identifier as stream name.
173      * @param uriInfo                 Base URI information.
174      * @param notificationQueryParams Query parameters of notification.
175      * @param handlersHolder          Holder of handlers for notifications.
176      * @return Location for listening.
177      */
178     final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
179             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
180         final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
181         final LogicalDatastoreType datastoreType = parseURIEnum(
182                 LogicalDatastoreType.class,
183                 mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
184         if (datastoreType == null) {
185             final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
186             LOG.debug(message);
187             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
188         }
189
190         final DataChangeScope scope = parseURIEnum(
191                 DataChangeScope.class,
192                 mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
193         if (scope == null) {
194             final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
195             LOG.warn(message);
196             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
197         }
198
199         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
200         final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
201         Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
202
203         listener.get().setQueryParams(
204                 notificationQueryParams.getStart(),
205                 notificationQueryParams.getStop().orElse(null),
206                 notificationQueryParams.getFilter().orElse(null),
207                 false, notificationQueryParams.isSkipNotificationData());
208         listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
209         registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
210
211         final URI uri = prepareUriByStreamName(uriInfo, streamName);
212         final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get();
213         final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction();
214         final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get();
215         final String serializedPath = IdentifierCodec.serialize(listener.get().getPath(), schemaContext);
216
217         final MapEntryNode mapToStreams =
218             RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
219                 notificationQueryParams.getStart(), listener.get().getOutputType(), uri, schemaContext, serializedPath);
220         writeDataToDS(schemaContext, serializedPath, writeTransaction, mapToStreams);
221         submitData(writeTransaction);
222         transactionChain.close();
223         return uri;
224     }
225
226     private static void writeDataToDS(final EffectiveModelContext schemaContext, final String name,
227             final DOMDataTreeReadWriteTransaction readWriteTransaction, final MapEntryNode mapToStreams) {
228         readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
229             // FIXME: do not use IdentifierCodec here
230             IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name, schemaContext),
231             mapToStreams);
232     }
233
234     private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
235         try {
236             readWriteTransaction.commit().get();
237         } catch (final InterruptedException | ExecutionException e) {
238             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
239         }
240     }
241
242     /**
243      * Prepare map of URI parameter-values.
244      *
245      * @param identifier String identification of URI.
246      * @return Map od URI parameters and values.
247      */
248     private static Map<String, String> mapValuesFromUri(final String identifier) {
249         final HashMap<String, String> result = new HashMap<>();
250         for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
251             final String[] paramToken = token.split("=");
252             if (paramToken.length == 2) {
253                 result.put(paramToken[0], paramToken[1]);
254             }
255         }
256         return result;
257     }
258
259     /**
260      * Register data change listener in DOM data broker and set it to listener on stream.
261      *
262      * @param datastore     {@link LogicalDatastoreType}
263      * @param listener      listener on specific stream
264      * @param domDataBroker data broker for register data change listener
265      */
266     private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
267             final DOMDataBroker domDataBroker) {
268         if (listener.isListening()) {
269             return;
270         }
271
272         final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
273                 .getInstance(DOMDataTreeChangeService.class);
274         if (changeService == null) {
275             throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
276         }
277
278         final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
279         final ListenerRegistration<ListenerAdapter> registration =
280                 changeService.registerDataTreeChangeListener(root, listener);
281         listener.setRegistration(registration);
282     }
283
284     // FIXME: this method should be in NotificationListenerAdapter
285     private static void registerToListenNotification(final NotificationListenerAdapter listener,
286             final DOMNotificationService notificationService) {
287         if (listener.isListening()) {
288             return;
289         }
290
291         final Absolute path = listener.getSchemaPath();
292         final ListenerRegistration<DOMNotificationListener> registration =
293                 notificationService.registerNotificationListener(listener, path);
294         listener.setRegistration(registration);
295     }
296
297     /**
298      * Parse out enumeration from URI.
299      *
300      * @param clazz Target enumeration type.
301      * @param value String representation of enumeration value.
302      * @return Parsed enumeration type.
303      */
304     private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
305         if (value == null || value.equals("")) {
306             return null;
307         }
308         return ResolveEnumUtil.resolveEnum(clazz, value);
309     }
310 }