Bug 4883 - implement query parameter - filter
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / restconf / restful / utils / SubscribeToStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.restful.utils;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Strings;
12 import java.net.URI;
13 import java.text.DateFormat;
14 import java.text.ParseException;
15 import java.text.SimpleDateFormat;
16 import java.util.ArrayList;
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import javax.ws.rs.core.UriBuilder;
23 import javax.ws.rs.core.UriInfo;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
28 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
29 import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
30 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
31 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
32 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
33 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
34 import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
35 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
36 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
37 import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
38 import org.opendaylight.restconf.handlers.NotificationServiceHandler;
39 import org.opendaylight.restconf.handlers.SchemaContextHandler;
40 import org.opendaylight.restconf.utils.RestconfConstants;
41 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.common.QName;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
47 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
48 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
49 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
50 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Subscribe to stream util class
56  *
57  */
58 public final class SubscribeToStreamUtil {
59
60     private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
61
62     private SubscribeToStreamUtil() {
63         throw new UnsupportedOperationException("Util class");
64     }
65
66     /**
67      * Parse enum from URI
68      *
69      * @param clazz
70      *            - enum type
71      * @param value
72      *            - string of enum value
73      * @return enum
74      */
75     public static <T> T parseURIEnum(final Class<T> clazz, final String value) {
76         if ((value == null) || value.equals("")) {
77             return null;
78         }
79         return StreamUtil.resolveEnum(clazz, value);
80     }
81
82     /**
83      * Prepare map of values from URI
84      *
85      * @param identifier
86      *            - URI
87      * @return {@link Map}
88      */
89     public static Map<String, String> mapValuesFromUri(final String identifier) {
90         final HashMap<String, String> result = new HashMap<>();
91         final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
92         for (final String token : tokens) {
93             final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
94             if (paramToken.length == 2) {
95                 result.put(paramToken[0], paramToken[1]);
96             }
97         }
98         return result;
99     }
100
101     /**
102      * Register data change listener in dom data broker and set it to listener
103      * on stream
104      *
105      * @param ds
106      *            - {@link LogicalDatastoreType}
107      * @param scope
108      *            - {@link DataChangeScope}
109      * @param listener
110      *            - listener on specific stream
111      * @param domDataBroker
112      *            - data broker for register data change listener
113      */
114     private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
115             final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
116         if (listener.isListening()) {
117             return;
118         }
119
120         final YangInstanceIdentifier path = listener.getPath();
121         final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(ds,
122                 path, listener, scope);
123
124         listener.setRegistration(registration);
125     }
126
127     /**
128      * Get port from web socket server. If doesn't exit, create it.
129      *
130      * @return port
131      */
132     private static int prepareNotificationPort() {
133         int port = RestconfStreamsConstants.NOTIFICATION_PORT;
134         try {
135             final WebSocketServer webSocketServer = WebSocketServer.getInstance();
136             port = webSocketServer.getPort();
137         } catch (final NullPointerException e) {
138             WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
139         }
140         return port;
141     }
142
143     /**
144      * Register listeners by streamName in identifier to listen to yang notifications
145      *
146      * @param identifier
147      *            - identifier as stream name
148      * @param uriInfo
149      *            - for getting base URI information
150      * @param start
151      *            - start-time query parameter
152      * @param stop
153      *            - stop-time query parameter
154      * @param notifiServiceHandler
155      *            - DOMNotificationService handler for register listeners
156      * @param filter
157      *            - indicate which subset of all possible events are of interest
158      * @return location for listening
159      */
160     public static URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
161             final NotificationServiceHandler notifiServiceHandler, final String filter) {
162         final String streamName = Notificator.createStreamNameFromUri(identifier);
163         if (Strings.isNullOrEmpty(streamName)) {
164             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
165         }
166         final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
167         if ((listeners == null) || listeners.isEmpty()) {
168             throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
169                     ErrorTag.UNKNOWN_ELEMENT);
170         }
171
172         for (final NotificationListenerAdapter listener : listeners) {
173             registerToListenNotification(listener, notifiServiceHandler);
174             listener.setQueryParams(start, stop, filter);
175         }
176
177         final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
178         int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT;
179         try {
180             final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
181             notificationPort = webSocketServerInstance.getPort();
182         } catch (final NullPointerException e) {
183             WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
184         }
185         final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
186         final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
187
188         return uriToWebsocketServer;
189     }
190
191     private static void registerToListenNotification(final NotificationListenerAdapter listener,
192             final NotificationServiceHandler notificationServiceHandler) {
193         if (listener.isListening()) {
194             return;
195         }
196
197         final SchemaPath path = listener.getSchemaPath();
198         final ListenerRegistration<DOMNotificationListener> registration =
199                 notificationServiceHandler.get().registerNotificationListener(listener, path);
200
201         listener.setRegistration(registration);
202     }
203
204     /**
205      * Prepare InstanceIdentifierContext for Location leaf
206      *
207      * @param schemaHandler
208      *            - schemaContext handler
209      * @return InstanceIdentifier of Location leaf
210      */
211     public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
212         final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
213         final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
214                 .findModuleByNamespaceAndRevision(qnameBase.getNamespace(), qnameBase.getRevision())
215                 .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
216         final List<PathArgument> path = new ArrayList<>();
217         path.add(NodeIdentifier.create(qnameBase));
218         path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
219
220         return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
221                 schemaHandler.get());
222     }
223
224     /**
225      * Register listener by streamName in identifier to listen to yang notifications
226      *
227      * @param identifier
228      *            - identifier as stream name
229      * @param uriInfo
230      *            - for getting base URI information
231      * @param start
232      *            - start-time query parameter
233      * @param stop
234      *            - stop-time query parameter
235      * @param domDataBrokerHandler
236      *            - DOMDataBroker handler for register listener
237      * @param filter
238      *            - indicate which subset of all possible events are of interest
239      * @return location for listening
240      */
241     public static URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
242             final DOMDataBrokerHandler domDataBrokerHandler, final String filter) {
243         final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
244
245         final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
246                 mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
247         if (ds == null) {
248             final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
249             LOG.debug(msg);
250             throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
251         }
252
253         final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
254                 mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
255         if (scope == null) {
256             final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
257             LOG.warn(msg);
258             throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
259         }
260
261         final String streamName = Notificator.createStreamNameFromUri(identifier);
262
263         final ListenerAdapter listener = Notificator.getListenerFor(streamName);
264         Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
265
266         listener.setQueryParams(start, stop, filter);
267
268         SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
269
270         final int port = SubscribeToStreamUtil.prepareNotificationPort();
271
272         final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
273         final UriBuilder uriToWebSocketServer =
274                 uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
275         return uriToWebSocketServer.replacePath(streamName).build();
276     }
277
278     public static Date parseDateFromQueryParam(final Entry<String, List<String>> entry) {
279         final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
280         String numOf_ms = "";
281         final String value = event.getValue();
282         if (value.contains(".")) {
283             numOf_ms = numOf_ms + ".";
284             final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+")
285                     : (value.contains("-") ? value.indexOf("-") : value.length()));
286             for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) {
287                 numOf_ms = numOf_ms + "S";
288             }
289         }
290         String zone = "";
291         if (!value.contains("Z")) {
292             zone = zone + "XXX";
293         }
294         final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone);
295
296         try {
297             return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z"))
298                     : value.replace('T', ' '));
299         } catch (final ParseException e) {
300             throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
301         }
302     }
303 }