fddbad9ee1ca8d1811217914eee741e07d9c5ca9
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / restconf / restful / utils / SubscribeToStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.restful.utils;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Strings;
12 import java.net.URI;
13 import java.text.DateFormat;
14 import java.text.ParseException;
15 import java.text.SimpleDateFormat;
16 import java.util.ArrayList;
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import javax.ws.rs.core.UriBuilder;
23 import javax.ws.rs.core.UriInfo;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
32 import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
33 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
34 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
35 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
36 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
37 import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
38 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
39 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
40 import org.opendaylight.restconf.Rfc8040.MonitoringModule;
41 import org.opendaylight.restconf.handlers.NotificationServiceHandler;
42 import org.opendaylight.restconf.handlers.SchemaContextHandler;
43 import org.opendaylight.restconf.parser.IdentifierCodec;
44 import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
45 import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
46 import org.opendaylight.restconf.utils.RestconfConstants;
47 import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
49 import org.opendaylight.yangtools.concepts.ListenerRegistration;
50 import org.opendaylight.yangtools.yang.common.QName;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
56 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
57 import org.opendaylight.yangtools.yang.model.api.Module;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
60 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * Subscribe to stream util class
66  *
67  */
68 public final class SubscribeToStreamUtil {
69
70     private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
71
72     private SubscribeToStreamUtil() {
73         throw new UnsupportedOperationException("Util class");
74     }
75
76     /**
77      * Register listeners by streamName in identifier to listen to yang
78      * notifications, put or delete info about listener to DS according to
79      * ietf-restconf-monitoring
80      *
81      * @param identifier
82      *            - identifier as stream name
83      * @param uriInfo
84      *            - for getting base URI information
85      * @param notificationQueryParams
86      *            - query parameters of notification
87      * @param handlersHolder
88      *            - holder of handlers for notifications
89      * @return location for listening
90      */
91     @SuppressWarnings("rawtypes")
92     public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
93             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
94         final String streamName = Notificator.createStreamNameFromUri(identifier);
95         if (Strings.isNullOrEmpty(streamName)) {
96             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
97         }
98         final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
99         if ((listeners == null) || listeners.isEmpty()) {
100             throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
101                     ErrorTag.UNKNOWN_ELEMENT);
102         }
103
104         final DOMDataReadWriteTransaction wTx =
105                 handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
106         final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
107         final boolean exist = checkExist(schemaContext, wTx);
108
109         final URI uri = prepareUriByStreamName(uriInfo, streamName);
110         for (final NotificationListenerAdapter listener : listeners) {
111             registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
112             listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
113                     notificationQueryParams.getFilter());
114             listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
115             final NormalizedNode mapToStreams =
116                     RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
117                             schemaContext.getNotifications(), notificationQueryParams.getStart(),
118                             listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
119             writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
120                     mapToStreams);
121         }
122         submitData(wTx);
123
124         return uri;
125     }
126
127     /**
128      * Prepare InstanceIdentifierContext for Location leaf
129      *
130      * @param schemaHandler
131      *            - schemaContext handler
132      * @return InstanceIdentifier of Location leaf
133      */
134     public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
135         final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
136         final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
137                 .findModuleByNamespaceAndRevision(qnameBase.getNamespace(), qnameBase.getRevision())
138                 .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
139         final List<PathArgument> path = new ArrayList<>();
140         path.add(NodeIdentifier.create(qnameBase));
141         path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
142
143         return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
144                 schemaHandler.get());
145     }
146
147     /**
148      * Register listener by streamName in identifier to listen to data change
149      * notifications, put or delete info about listener to DS according to
150      * ietf-restconf-monitoring
151      *
152      * @param identifier
153      *            - identifier as stream name
154      * @param uriInfo
155      *            - for getting base URI information
156      * @param notificationQueryParams
157      *            - query parameters of notification
158      * @param handlersHolder
159      *            - holder of handlers for notifications
160      * @return location for listening
161      */
162     @SuppressWarnings("rawtypes")
163     public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
164             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
165         final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
166
167         final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
168                 mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
169         if (ds == null) {
170             final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
171             LOG.debug(msg);
172             throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
173         }
174
175         final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
176                 mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
177         if (scope == null) {
178             final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
179             LOG.warn(msg);
180             throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
181         }
182
183         final String streamName = Notificator.createStreamNameFromUri(identifier);
184
185         final ListenerAdapter listener = Notificator.getListenerFor(streamName);
186         Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
187
188         listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
189                 notificationQueryParams.getFilter());
190         listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
191
192         registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
193
194         final URI uri = prepareUriByStreamName(uriInfo, streamName);
195
196         final DOMDataReadWriteTransaction wTx =
197                 handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
198         final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
199         final boolean exist = checkExist(schemaContext, wTx);
200
201         final NormalizedNode mapToStreams = RestconfMappingNodeUtil
202                 .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
203                         notificationQueryParams.getStart(), listener.getOutputType(), uri,
204                         getMonitoringModule(schemaContext), exist, schemaContext);
205         writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
206                 mapToStreams);
207         submitData(wTx);
208         return uri;
209     }
210
211     public static Module getMonitoringModule(final SchemaContext schemaContext) {
212         final Module monitoringModule =
213                 schemaContext.findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
214         return monitoringModule;
215     }
216
217     /**
218      * Parse input of query parameters - start-time or stop-time - from
219      * {@link DateAndTime} format to {@link Date} format
220      *
221      * @param entry
222      *            - start-time or stop-time as string in {@link DateAndTime}
223      *            format
224      * @return parsed {@link Date} by entry
225      */
226     public static Date parseDateFromQueryParam(final Entry<String, List<String>> entry) {
227         final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
228         String numOf_ms = "";
229         final String value = event.getValue();
230         if (value.contains(".")) {
231             numOf_ms = numOf_ms + ".";
232             final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+")
233                     : (value.contains("-") ? value.indexOf("-") : value.length()));
234             for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) {
235                 numOf_ms = numOf_ms + "S";
236             }
237         }
238         String zone = "";
239         if (!value.contains("Z")) {
240             zone = zone + "XXX";
241         }
242         final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone);
243
244         try {
245             return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z"))
246                     : value.replace('T', ' '));
247         } catch (final ParseException e) {
248             throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
249         }
250     }
251
252     @SuppressWarnings("rawtypes")
253     private static void writeDataToDS(final SchemaContext schemaContext, final String name,
254             final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
255         String pathId = "";
256         if (exist) {
257             pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
258         } else {
259             pathId = MonitoringModule.PATH_TO_STREAMS;
260         }
261         wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
262                 mapToStreams);
263     }
264
265     private static void submitData(final DOMDataReadWriteTransaction wTx) {
266         try {
267             wTx.submit().checkedGet();
268         } catch (final TransactionCommitFailedException e) {
269             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
270         }
271     }
272
273     /**
274      * Prepare map of values from URI
275      *
276      * @param identifier
277      *            - URI
278      * @return {@link Map}
279      */
280     public static Map<String, String> mapValuesFromUri(final String identifier) {
281         final HashMap<String, String> result = new HashMap<>();
282         final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
283         for (final String token : tokens) {
284             final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
285             if (paramToken.length == 2) {
286                 result.put(paramToken[0], paramToken[1]);
287             }
288         }
289         return result;
290     }
291
292     private static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
293         final int port = SubscribeToStreamUtil.prepareNotificationPort();
294
295         final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
296         final UriBuilder uriToWebSocketServer =
297                 uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
298         final URI uri = uriToWebSocketServer.replacePath(streamName).build();
299         return uri;
300     }
301
302     /**
303      * Register data change listener in dom data broker and set it to listener
304      * on stream
305      *
306      * @param ds
307      *            - {@link LogicalDatastoreType}
308      * @param scope
309      *            - {@link DataChangeScope}
310      * @param listener
311      *            - listener on specific stream
312      * @param domDataBroker
313      *            - data broker for register data change listener
314      */
315     @SuppressWarnings("deprecation")
316     private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
317             final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
318         if (listener.isListening()) {
319             return;
320         }
321
322         final YangInstanceIdentifier path = listener.getPath();
323         final ListenerRegistration<DOMDataChangeListener> registration =
324                 domDataBroker.registerDataChangeListener(ds, path, listener, scope);
325
326         listener.setRegistration(registration);
327     }
328
329     /**
330      * Get port from web socket server. If doesn't exit, create it.
331      *
332      * @return port
333      */
334     private static int prepareNotificationPort() {
335         int port = RestconfStreamsConstants.NOTIFICATION_PORT;
336         try {
337             final WebSocketServer webSocketServer = WebSocketServer.getInstance();
338             port = webSocketServer.getPort();
339         } catch (final NullPointerException e) {
340             WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
341         }
342         return port;
343     }
344
345     private static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) {
346         boolean exist;
347         try {
348             exist = wTx.exists(LogicalDatastoreType.OPERATIONAL,
349                     IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).checkedGet();
350         } catch (final ReadFailedException e1) {
351             throw new RestconfDocumentedException("Problem while checking data if exists", e1);
352         }
353         return exist;
354     }
355
356     private static void registerToListenNotification(final NotificationListenerAdapter listener,
357             final NotificationServiceHandler notificationServiceHandler) {
358         if (listener.isListening()) {
359             return;
360         }
361
362         final SchemaPath path = listener.getSchemaPath();
363         final ListenerRegistration<DOMNotificationListener> registration =
364                 notificationServiceHandler.get().registerNotificationListener(listener, path);
365
366         listener.setRegistration(registration);
367     }
368
369     /**
370      * Parse enum from URI
371      *
372      * @param clazz
373      *            - enum type
374      * @param value
375      *            - string of enum value
376      * @return enum
377      */
378     private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
379         if ((value == null) || value.equals("")) {
380             return null;
381         }
382         return ResolveEnumUtil.resolveEnum(clazz, value);
383     }
384
385 }