Reduce StreamsConfiguration proliferation
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / SubscribeToStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import static com.google.common.base.Strings.isNullOrEmpty;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.base.Splitter;
14 import java.net.URI;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.concurrent.ExecutionException;
18 import javax.ws.rs.core.UriBuilder;
19 import javax.ws.rs.core.UriInfo;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
22 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
23 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
25 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
26 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
27 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
28 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
29 import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
30 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
31 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
32 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
33 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
34 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
38 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Subscribe to stream util class.
44  */
45 public abstract class SubscribeToStreamUtil {
46     /**
47      * Implementation of SubscribeToStreamUtil for Server-sent events.
48      */
49     private static final class ServerSentEvents extends SubscribeToStreamUtil {
50         ServerSentEvents(final ListenersBroker listenersBroker) {
51             super(listenersBroker);
52         }
53
54         @Override
55         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
56             return uriInfo.getBaseUriBuilder()
57                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
58                 .build();
59         }
60     }
61
62     /**
63      * Implementation of SubscribeToStreamUtil for Web sockets.
64      */
65     private static final class WebSockets extends SubscribeToStreamUtil {
66         WebSockets(final ListenersBroker listenersBroker) {
67             super(listenersBroker);
68         }
69
70         @Override
71         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
72             final String scheme = uriInfo.getAbsolutePath().getScheme();
73             final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
74             switch (scheme) {
75                 case "https":
76                     // Secured HTTP goes to Secured WebSockets
77                     uriBuilder.scheme("wss");
78                     break;
79                 case "http":
80                 default:
81                     // Unsecured HTTP and others go to unsecured WebSockets
82                     uriBuilder.scheme("ws");
83             }
84             return uriBuilder.replacePath(URLConstants.BASE_PATH + '/' + streamName).build();
85         }
86     }
87
88     private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
89     private static final Splitter SLASH_SPLITTER = Splitter.on('/');
90
91     private final @NonNull ListenersBroker listenersBroker;
92
93     private SubscribeToStreamUtil(final ListenersBroker listenersBroker) {
94         this.listenersBroker = requireNonNull(listenersBroker);
95     }
96
97     public static @NonNull SubscribeToStreamUtil serverSentEvents(final ListenersBroker listenersBroker) {
98         return new ServerSentEvents(listenersBroker);
99     }
100
101     public static @NonNull SubscribeToStreamUtil webSockets(final ListenersBroker listenersBroker) {
102         return new WebSockets(listenersBroker);
103     }
104
105     public final @NonNull ListenersBroker listenersBroker() {
106         return listenersBroker;
107     }
108
109     /**
110      * Prepare URL from base name and stream name.
111      *
112      * @param uriInfo base URL information
113      * @param streamName name of stream for create
114      * @return final URL
115      */
116     abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
117
118     /**
119      * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
120      * about listener to DS according to ietf-restconf-monitoring.
121      *
122      * @param identifier              Name of the stream.
123      * @param uriInfo                 URI information.
124      * @param notificationQueryParams Query parameters of notification.
125      * @param handlersHolder          Holder of handlers for notifications.
126      * @return Stream location for listening.
127      */
128     final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
129             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
130         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
131         if (isNullOrEmpty(streamName)) {
132             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
133         }
134
135         final var notificationListenerAdapter = listenersBroker.notificationListenerFor(streamName);
136         if (notificationListenerAdapter == null) {
137             throw new RestconfDocumentedException(String.format("Stream with name %s was not found.", streamName),
138                 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
139         }
140
141         final URI uri = prepareUriByStreamName(uriInfo, streamName);
142         notificationListenerAdapter.setQueryParams(notificationQueryParams);
143         notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
144         final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
145         notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getDatabindProvider());
146         final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
147             notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
148             notificationListenerAdapter.getOutputType(), uri);
149
150         // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
151         final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
152         writeDataToDS(writeTransaction, mapToStreams);
153         submitData(writeTransaction);
154         return uri;
155     }
156
157     /**
158      * Register listener by streamName in identifier to listen to data change notifications, and put or delete
159      * information about listener to DS according to ietf-restconf-monitoring.
160      *
161      * @param identifier              Identifier as stream name.
162      * @param uriInfo                 Base URI information.
163      * @param notificationQueryParams Query parameters of notification.
164      * @param handlersHolder          Holder of handlers for notifications.
165      * @return Location for listening.
166      */
167     final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
168             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
169         final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
170
171         final String datastoreParam = mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME);
172         if (isNullOrEmpty(datastoreParam)) {
173             final String message = "Stream name does not contain datastore value (pattern /datastore=)";
174             LOG.debug(message);
175             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
176         }
177
178         // FIXME: this is kept only for compatibility, we are not using this parameter
179         if (isNullOrEmpty(mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME))) {
180             final String message = "Stream name does not contain scope value (pattern /scope=)";
181             LOG.warn(message);
182             throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
183         }
184
185         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
186         final ListenerAdapter listener = listenersBroker.dataChangeListenerFor(streamName);
187         if (listener == null) {
188             throw new RestconfDocumentedException("No listener found for stream " + streamName,
189                 ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
190         }
191
192         listener.setQueryParams(notificationQueryParams);
193
194         final DOMDataBroker dataBroker = handlersHolder.getDataBroker();
195         final DatabindProvider schemaHandler = handlersHolder.getDatabindProvider();
196         listener.setCloseVars(dataBroker, schemaHandler);
197         listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam));
198
199         final URI uri = prepareUriByStreamName(uriInfo, streamName);
200         final EffectiveModelContext schemaContext = schemaHandler.currentContext().modelContext();
201         final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
202
203         final MapEntryNode mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
204                 listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
205         final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
206         writeDataToDS(writeTransaction, mapToStreams);
207         submitData(writeTransaction);
208         return uri;
209     }
210
211     // FIXME: callers are utter duplicates, refactor them
212     private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
213         // FIXME: use put() here
214         tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
215             mapToStreams);
216     }
217
218     private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
219         try {
220             readWriteTransaction.commit().get();
221         } catch (final InterruptedException | ExecutionException e) {
222             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
223         }
224     }
225
226     /**
227      * Prepare map of URI parameter-values.
228      *
229      * @param identifier String identification of URI.
230      * @return Map od URI parameters and values.
231      */
232     private static Map<String, String> mapValuesFromUri(final String identifier) {
233         final var result = new HashMap<String, String>();
234         for (final String token : SLASH_SPLITTER.split(identifier)) {
235             final String[] paramToken = token.split("=");
236             if (paramToken.length == 2) {
237                 result.put(paramToken[0], paramToken[1]);
238             }
239         }
240         return result;
241     }
242 }