2e3dc1b379d984c42ff2615008690c2cf44d49e9
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static com.google.common.base.Strings.isNullOrEmpty;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
16 import java.net.URI;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.locks.StampedLock;
20 import javax.ws.rs.core.UriInfo;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
29 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
30 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
31 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
32 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
33 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
34 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.common.QName;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
40 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
41 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
42 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
48  * {@link NotificationListenerAdapter} listeners.
49  */
50 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
51 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
52 //        the contents of /restconf-state/streams.
53 public abstract sealed class ListenersBroker {
54     /**
55      * A ListenersBroker working with Server-Sent Events.
56      */
57     public static final class ServerSentEvents extends ListenersBroker {
58         @Override
59         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
60             return uriInfo.getBaseUriBuilder()
61                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
62                 .build();
63         }
64     }
65
66     /**
67      * A ListenersBroker working with WebSockets.
68      */
69     public static final class WebSockets extends ListenersBroker {
70         @Override
71         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
72             final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
73                 // Secured HTTP goes to Secured WebSockets
74                 case "https" -> "wss";
75                 // Unsecured HTTP and others go to unsecured WebSockets
76                 default -> "ws";
77             };
78
79             return uriInfo.getBaseUriBuilder()
80                 .scheme(scheme)
81                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
82                 .build();
83         }
84     }
85
86     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
87
88     private final StampedLock dataChangeListenersLock = new StampedLock();
89     private final StampedLock notificationListenersLock = new StampedLock();
90     private final StampedLock deviceNotificationListenersLock = new StampedLock();
91     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
92     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
93     private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
94
95     private ListenersBroker() {
96         // Hidden on purpose
97     }
98
99     /**
100      * Gets {@link ListenerAdapter} specified by stream identification.
101      *
102      * @param streamName Stream name.
103      * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
104      *         does not exist.
105      * @throws NullPointerException in {@code streamName} is {@code null}
106      */
107     public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
108         requireNonNull(streamName);
109
110         final long stamp = dataChangeListenersLock.readLock();
111         try {
112             return dataChangeListeners.get(streamName);
113         } finally {
114             dataChangeListenersLock.unlockRead(stamp);
115         }
116     }
117
118     /**
119      * Gets {@link NotificationListenerAdapter} specified by stream name.
120      *
121      * @param streamName Stream name.
122      * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
123      *         stream name does not exist.
124      * @throws NullPointerException in {@code streamName} is {@code null}
125      */
126     public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
127         requireNonNull(streamName);
128
129         final long stamp = notificationListenersLock.readLock();
130         try {
131             return notificationListeners.get(streamName);
132         } finally {
133             notificationListenersLock.unlockRead(stamp);
134         }
135     }
136
137     /**
138      * Get listener for device path.
139      *
140      * @param streamName name.
141      * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
142      *         specified stream name does not exist.
143      * @throws NullPointerException in {@code path} is {@code null}
144      */
145     public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
146         requireNonNull(streamName);
147
148         final long stamp = deviceNotificationListenersLock.readLock();
149         try {
150             return deviceNotificationListeners.get(streamName);
151         } finally {
152             deviceNotificationListenersLock.unlockRead(stamp);
153         }
154     }
155
156     /**
157      * Get listener for stream-name.
158      *
159      * @param streamName Stream name.
160      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
161      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
162      */
163     public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
164         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
165             return notificationListenerFor(streamName);
166         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
167             return dataChangeListenerFor(streamName);
168         } else if (streamName.startsWith(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM)) {
169             return deviceNotificationListenerFor(streamName);
170         } else {
171             return null;
172         }
173     }
174
175     /**
176      * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
177      * hasn't been created yet.
178      *
179      * @param path       Path to data in data repository.
180      * @param outputType Specific type of output for notifications - XML or JSON.
181      * @return Created or existing data-change listener adapter.
182      */
183     public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
184             final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
185             final NotificationOutputType outputType) {
186         final var sb = new StringBuilder(RestconfStreamsConstants.DATA_SUBSCRIPTION)
187             .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
188             .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
189             .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
190         if (outputType != NotificationOutputType.XML) {
191             sb.append('/').append(outputType.getName());
192         }
193
194         final long stamp = dataChangeListenersLock.writeLock();
195         try {
196             return dataChangeListeners.computeIfAbsent(sb.toString(),
197                 streamName -> new ListenerAdapter(datastore, path, streamName, outputType, this));
198         } finally {
199             dataChangeListenersLock.unlockWrite(stamp);
200         }
201     }
202
203     /**
204      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
205      * if such listener haven't been created yet.
206      *
207      * @param refSchemaCtx reference {@link EffectiveModelContext}
208      * @param notifications {@link QName}s of accepted YANG notifications
209      * @param outputType Specific type of output for notifications - XML or JSON.
210      * @return Created or existing notification listener adapter.
211      */
212     public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
213             final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
214         final var sb = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
215         var haveFirst = false;
216         for (var qname : notifications) {
217             final var module = refSchemaCtx.findModuleStatement(qname.getModule())
218                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
219                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
220             final var stmt = module.findSchemaTreeNode(qname)
221                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
222                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
223             if (!(stmt instanceof NotificationEffectiveStatement)) {
224                 throw new RestconfDocumentedException(qname + " refers to a non-notification",
225                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
226             }
227
228             if (haveFirst) {
229                 sb.append(',');
230             } else {
231                 haveFirst = true;
232             }
233             sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
234         }
235         if (outputType != NotificationOutputType.XML) {
236             sb.append('/').append(outputType.getName());
237         }
238
239         final long stamp = notificationListenersLock.writeLock();
240         try {
241             return notificationListeners.computeIfAbsent(sb.toString(),
242                 streamName -> new NotificationListenerAdapter(notifications, streamName, outputType, this));
243         } finally {
244             notificationListenersLock.unlockWrite(stamp);
245         }
246     }
247
248     /**
249      * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
250      * if such listener haven't been created yet.
251      *
252      * @param deviceName Device name.
253      * @param outputType Specific type of output for notifications - XML or JSON.
254      * @param refSchemaCtx Schema context of node
255      * @param mountPointService Mount point service
256      * @return Created or existing device notification listener adapter.
257      */
258     public final DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
259             final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
260             final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
261         final var sb = new StringBuilder(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM).append('/')
262             .append(deviceName);
263
264         final long stamp = deviceNotificationListenersLock.writeLock();
265         try {
266             return deviceNotificationListeners.computeIfAbsent(sb.toString(),
267                 streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
268                     mountPointService, path, this));
269         } finally {
270             deviceNotificationListenersLock.unlockWrite(stamp);
271         }
272     }
273
274     /**
275      * Removal and closing of all data-change-event and notification listeners.
276      */
277     public final synchronized void removeAndCloseAllListeners() {
278         final long stampNotifications = notificationListenersLock.writeLock();
279         final long stampDataChanges = dataChangeListenersLock.writeLock();
280         try {
281             removeAndCloseAllDataChangeListenersTemplate();
282             removeAndCloseAllNotificationListenersTemplate();
283         } finally {
284             dataChangeListenersLock.unlockWrite(stampDataChanges);
285             notificationListenersLock.unlockWrite(stampNotifications);
286         }
287     }
288
289     /**
290      * Closes and removes all data-change listeners.
291      */
292     public final void removeAndCloseAllDataChangeListeners() {
293         final long stamp = dataChangeListenersLock.writeLock();
294         try {
295             removeAndCloseAllDataChangeListenersTemplate();
296         } finally {
297             dataChangeListenersLock.unlockWrite(stamp);
298         }
299     }
300
301     @SuppressWarnings("checkstyle:IllegalCatch")
302     private void removeAndCloseAllDataChangeListenersTemplate() {
303         dataChangeListeners.values().forEach(listenerAdapter -> {
304             try {
305                 listenerAdapter.close();
306             } catch (Exception e) {
307                 LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
308                 throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
309                     e);
310             }
311         });
312         dataChangeListeners.clear();
313     }
314
315     /**
316      * Closes and removes all notification listeners.
317      */
318     public final void removeAndCloseAllNotificationListeners() {
319         final long stamp = notificationListenersLock.writeLock();
320         try {
321             removeAndCloseAllNotificationListenersTemplate();
322         } finally {
323             notificationListenersLock.unlockWrite(stamp);
324         }
325     }
326
327     @SuppressWarnings("checkstyle:IllegalCatch")
328     private void removeAndCloseAllNotificationListenersTemplate() {
329         notificationListeners.values().forEach(listenerAdapter -> {
330             try {
331                 listenerAdapter.close();
332             } catch (Exception e) {
333                 LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
334                 throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
335                     e);
336             }
337         });
338         notificationListeners.clear();
339     }
340
341     /**
342      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
343      *
344      * @param listener Listener to be closed and removed.
345      */
346     @SuppressWarnings("checkstyle:IllegalCatch")
347     public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
348         final long stamp = dataChangeListenersLock.writeLock();
349         try {
350             removeAndCloseDataChangeListenerTemplate(listener);
351         } catch (Exception exception) {
352             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
353         } finally {
354             dataChangeListenersLock.unlockWrite(stamp);
355         }
356     }
357
358     /**
359      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
360      *
361      * @param listener Listener to be closed and removed.
362      */
363     private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
364         try {
365             requireNonNull(listener).close();
366             if (dataChangeListeners.inverse().remove(listener) == null) {
367                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
368             }
369         } catch (InterruptedException | ExecutionException e) {
370             LOG.error("Data-change listener {} cannot be closed.", listener, e);
371             throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
372         }
373     }
374
375     /**
376      * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
377      *
378      * @param listener Listener to be closed and removed.
379      */
380     @SuppressWarnings("checkstyle:IllegalCatch")
381     public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
382         final long stamp = notificationListenersLock.writeLock();
383         try {
384             removeAndCloseNotificationListenerTemplate(listener);
385         } catch (Exception e) {
386             LOG.error("Notification listener {} cannot be closed.", listener, e);
387         } finally {
388             notificationListenersLock.unlockWrite(stamp);
389         }
390     }
391
392     /**
393      * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
394      * specified in parameter.
395      *
396      * @param listener Listener to be closed and removed.
397      */
398     @SuppressWarnings("checkstyle:IllegalCatch")
399     public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
400         final long stamp = deviceNotificationListenersLock.writeLock();
401         try {
402             requireNonNull(listener);
403             if (deviceNotificationListeners.inverse().remove(listener) == null) {
404                 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
405             }
406         } catch (final Exception exception) {
407             LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
408         } finally {
409             deviceNotificationListenersLock.unlockWrite(stamp);
410         }
411     }
412
413     private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
414         try {
415             requireNonNull(listener).close();
416             if (notificationListeners.inverse().remove(listener) == null) {
417                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
418             }
419         } catch (InterruptedException | ExecutionException e) {
420             LOG.error("Notification listener {} cannot be closed.", listener, e);
421             throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
422         }
423     }
424
425     /**
426      * Removal and closing of general listener (data-change or notification listener).
427      *
428      * @param listener Listener to be closed and removed from cache.
429      */
430     final void removeAndCloseListener(final BaseListenerInterface listener) {
431         requireNonNull(listener);
432         if (listener instanceof ListenerAdapter) {
433             removeAndCloseDataChangeListener((ListenerAdapter) listener);
434         } else if (listener instanceof NotificationListenerAdapter) {
435             removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
436         }
437     }
438
439     /**
440      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
441      * and optionally {@link URLConstants#BASE_PATH} prefix.
442      *
443      * @param uri URI for creation of stream name.
444      * @return String representation of stream name.
445      */
446     private static String createStreamNameFromUri(final String uri) {
447         String result = requireNonNull(uri);
448         while (true) {
449             if (result.startsWith(URLConstants.BASE_PATH)) {
450                 result = result.substring(URLConstants.BASE_PATH.length());
451             } else if (result.startsWith("/")) {
452                 result = result.substring(1);
453             } else {
454                 break;
455             }
456         }
457         if (result.endsWith("/")) {
458             result = result.substring(0, result.length() - 1);
459         }
460         return result;
461     }
462
463     /**
464      * Prepare URL from base name and stream name.
465      *
466      * @param uriInfo base URL information
467      * @param streamName name of stream for create
468      * @return final URL
469      */
470     public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
471
472     /**
473      * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
474      * about listener to DS according to ietf-restconf-monitoring.
475      *
476      * @param identifier              Name of the stream.
477      * @param uriInfo                 URI information.
478      * @param notificationQueryParams Query parameters of notification.
479      * @param handlersHolder          Holder of handlers for notifications.
480      * @return Stream location for listening.
481      */
482     public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
483             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
484         final String streamName = createStreamNameFromUri(identifier);
485         if (isNullOrEmpty(streamName)) {
486             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
487         }
488
489         final var notificationListenerAdapter = notificationListenerFor(streamName);
490         if (notificationListenerAdapter == null) {
491             throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
492                 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
493         }
494
495         final URI uri = prepareUriByStreamName(uriInfo, streamName);
496         notificationListenerAdapter.setQueryParams(notificationQueryParams);
497         notificationListenerAdapter.listen(handlersHolder.notificationService());
498         final DOMDataBroker dataBroker = handlersHolder.dataBroker();
499         notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
500         final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
501             notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
502             notificationListenerAdapter.getOutputType(), uri);
503
504         // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
505         final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
506         writeDataToDS(writeTransaction, mapToStreams);
507         submitData(writeTransaction);
508         return uri;
509     }
510
511     /**
512      * Register listener by streamName in identifier to listen to data change notifications, and put or delete
513      * information about listener to DS according to ietf-restconf-monitoring.
514      *
515      * @param identifier              Identifier as stream name.
516      * @param uriInfo                 Base URI information.
517      * @param notificationQueryParams Query parameters of notification.
518      * @param handlersHolder          Holder of handlers for notifications.
519      * @return Location for listening.
520      */
521     public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
522             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
523         final var streamName = createStreamNameFromUri(identifier);
524         final var listener = dataChangeListenerFor(streamName);
525         if (listener == null) {
526             throw new RestconfDocumentedException("No listener found for stream " + streamName,
527                 ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
528         }
529
530         listener.setQueryParams(notificationQueryParams);
531
532         final var dataBroker = handlersHolder.dataBroker();
533         final var schemaHandler = handlersHolder.databindProvider();
534         listener.setCloseVars(dataBroker, schemaHandler);
535         listener.listen(dataBroker);
536
537         final var uri = prepareUriByStreamName(uriInfo, streamName);
538         final var schemaContext = schemaHandler.currentContext().modelContext();
539         final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
540
541         final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
542                 listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
543         final var writeTransaction = dataBroker.newWriteOnlyTransaction();
544         writeDataToDS(writeTransaction, mapToStreams);
545         submitData(writeTransaction);
546         return uri;
547     }
548
549     // FIXME: callers are utter duplicates, refactor them
550     private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
551         // FIXME: use put() here
552         tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
553             mapToStreams);
554     }
555
556     private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
557         try {
558             readWriteTransaction.commit().get();
559         } catch (final InterruptedException | ExecutionException e) {
560             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
561         }
562     }
563 }