af6d5f7e6a2b011a4e93609c4fe9f26eb4b43a04
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.BiMap;
13 import com.google.common.collect.HashBiMap;
14 import java.util.Optional;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.locks.StampedLock;
17 import javax.inject.Inject;
18 import javax.inject.Singleton;
19 import org.eclipse.jdt.annotation.Nullable;
20 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
21 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
23 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
26 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
27 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
28 import org.osgi.service.component.annotations.Activate;
29 import org.osgi.service.component.annotations.Component;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
35  * {@link NotificationListenerAdapter} listeners.
36  */
37 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
38 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
39 //        the contents of /restconf-state/streams.
40 @Singleton
41 @Component(service = ListenersBroker.class, immediate = true)
42 public final class ListenersBroker {
43     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
44
45     private final StampedLock dataChangeListenersLock = new StampedLock();
46     private final StampedLock notificationListenersLock = new StampedLock();
47     private final StampedLock deviceNotificationListenersLock = new StampedLock();
48     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
49     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
50     private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
51
52     @Inject
53     @Activate
54     public ListenersBroker() {
55
56     }
57
58     /**
59      * Gets {@link ListenerAdapter} specified by stream identification.
60      *
61      * @param streamName Stream name.
62      * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
63      *         does not exist.
64      * @throws NullPointerException in {@code streamName} is {@code null}
65      */
66     public @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
67         requireNonNull(streamName);
68
69         final long stamp = dataChangeListenersLock.readLock();
70         try {
71             return dataChangeListeners.get(streamName);
72         } finally {
73             dataChangeListenersLock.unlockRead(stamp);
74         }
75     }
76
77     /**
78      * Gets {@link NotificationListenerAdapter} specified by stream name.
79      *
80      * @param streamName Stream name.
81      * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
82      *         stream name does not exist.
83      * @throws NullPointerException in {@code streamName} is {@code null}
84      */
85     public @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
86         requireNonNull(streamName);
87
88         final long stamp = notificationListenersLock.readLock();
89         try {
90             return notificationListeners.get(streamName);
91         } finally {
92             notificationListenersLock.unlockRead(stamp);
93         }
94     }
95
96     /**
97      * Get listener for device path.
98      *
99      * @param path name.
100      * @return {@link BaseListenerInterface} specified by stream name or {@code null} if listener with specified
101      *         stream name does not exist.
102      * @throws NullPointerException in {@code path} is {@code null}
103      */
104     public @Nullable BaseListenerInterface deviceNotificationListenerFor(final String path) {
105         requireNonNull(path);
106
107         final long stamp = deviceNotificationListenersLock.readLock();
108         try {
109             return deviceNotificationListeners.get(path);
110         } finally {
111             deviceNotificationListenersLock.unlockRead(stamp);
112         }
113     }
114
115     /**
116      * Get listener for stream-name.
117      *
118      * @param streamName Stream name.
119      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
120      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
121      */
122     public @Nullable BaseListenerInterface listenerFor(final String streamName) {
123         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
124             return notificationListenerFor(streamName);
125         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
126             return dataChangeListenerFor(streamName);
127         } else {
128             return null;
129         }
130     }
131
132     /**
133      * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
134      * hasn't been created yet.
135      *
136      * @param path       Path to data in data repository.
137      * @param streamName Stream name.
138      * @param outputType Specific type of output for notifications - XML or JSON.
139      * @return Created or existing data-change listener adapter.
140      */
141     public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
142             final NotificationOutputType outputType) {
143         requireNonNull(path);
144         requireNonNull(streamName);
145         requireNonNull(outputType);
146
147         final long stamp = dataChangeListenersLock.writeLock();
148         try {
149             return dataChangeListeners.computeIfAbsent(streamName,
150                 stream -> new ListenerAdapter(path, stream, outputType, this));
151         } finally {
152             dataChangeListenersLock.unlockWrite(stamp);
153         }
154     }
155
156     /**
157      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
158      * if such listener haven't been created yet.
159      *
160      * @param schemaPath Schema path of YANG notification structure.
161      * @param streamName Stream name.
162      * @param outputType Specific type of output for notifications - XML or JSON.
163      * @return Created or existing notification listener adapter.
164      */
165     public NotificationListenerAdapter registerNotificationListener(final Absolute schemaPath,
166             final String streamName, final NotificationOutputType outputType) {
167         requireNonNull(schemaPath);
168         requireNonNull(streamName);
169         requireNonNull(outputType);
170
171         final long stamp = notificationListenersLock.writeLock();
172         try {
173             return notificationListeners.computeIfAbsent(streamName,
174                 stream -> new NotificationListenerAdapter(schemaPath, stream, outputType, this));
175         } finally {
176             notificationListenersLock.unlockWrite(stamp);
177         }
178     }
179
180     /**
181      * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
182      * if such listener haven't been created yet.
183      *
184      * @param streamName Stream name.
185      * @param outputType Specific type of output for notifications - XML or JSON.
186      * @param refSchemaCtx Schema context of node
187      * @param mountPointService Mount point service
188      * @return Created or existing device notification listener adapter.
189      */
190     public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
191         final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
192         final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
193
194         final long stamp = deviceNotificationListenersLock.writeLock();
195         try {
196             return deviceNotificationListeners.computeIfAbsent(streamName,
197                 stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
198                     mountPointService, path, this));
199         } finally {
200             deviceNotificationListenersLock.unlockWrite(stamp);
201         }
202     }
203
204     /**
205      * Removal and closing of all data-change-event and notification listeners.
206      */
207     public synchronized void removeAndCloseAllListeners() {
208         final long stampNotifications = notificationListenersLock.writeLock();
209         final long stampDataChanges = dataChangeListenersLock.writeLock();
210         try {
211             removeAndCloseAllDataChangeListenersTemplate();
212             removeAndCloseAllNotificationListenersTemplate();
213         } finally {
214             dataChangeListenersLock.unlockWrite(stampDataChanges);
215             notificationListenersLock.unlockWrite(stampNotifications);
216         }
217     }
218
219     /**
220      * Closes and removes all data-change listeners.
221      */
222     public void removeAndCloseAllDataChangeListeners() {
223         final long stamp = dataChangeListenersLock.writeLock();
224         try {
225             removeAndCloseAllDataChangeListenersTemplate();
226         } finally {
227             dataChangeListenersLock.unlockWrite(stamp);
228         }
229     }
230
231     @SuppressWarnings("checkstyle:IllegalCatch")
232     private void removeAndCloseAllDataChangeListenersTemplate() {
233         dataChangeListeners.values()
234                 .forEach(listenerAdapter -> {
235                     try {
236                         listenerAdapter.close();
237                     } catch (final Exception exception) {
238                         LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
239                         throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
240                                 listenerAdapter), exception);
241                     }
242                 });
243         dataChangeListeners.clear();
244     }
245
246     /**
247      * Closes and removes all notification listeners.
248      */
249     public void removeAndCloseAllNotificationListeners() {
250         final long stamp = notificationListenersLock.writeLock();
251         try {
252             removeAndCloseAllNotificationListenersTemplate();
253         } finally {
254             notificationListenersLock.unlockWrite(stamp);
255         }
256     }
257
258     @SuppressWarnings("checkstyle:IllegalCatch")
259     private void removeAndCloseAllNotificationListenersTemplate() {
260         notificationListeners.values()
261                 .forEach(listenerAdapter -> {
262                     try {
263                         listenerAdapter.close();
264                     } catch (final Exception exception) {
265                         LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
266                         throw new IllegalStateException(String.format("Failed to close notification listener %s.",
267                                 listenerAdapter), exception);
268                     }
269                 });
270         notificationListeners.clear();
271     }
272
273     /**
274      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
275      *
276      * @param listener Listener to be closed and removed.
277      */
278     @SuppressWarnings("checkstyle:IllegalCatch")
279     public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
280         final long stamp = dataChangeListenersLock.writeLock();
281         try {
282             removeAndCloseDataChangeListenerTemplate(listener);
283         } catch (final Exception exception) {
284             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
285         } finally {
286             dataChangeListenersLock.unlockWrite(stamp);
287         }
288     }
289
290     /**
291      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
292      *
293      * @param listener Listener to be closed and removed.
294      */
295     private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
296         try {
297             requireNonNull(listener).close();
298             if (dataChangeListeners.inverse().remove(listener) == null) {
299                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
300             }
301         } catch (final InterruptedException | ExecutionException exception) {
302             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
303             throw new IllegalStateException(String.format(
304                     "Data-change listener %s cannot be closed.",
305                     listener), exception);
306         }
307     }
308
309     /**
310      * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
311      *
312      * @param listener Listener to be closed and removed.
313      */
314     @SuppressWarnings("checkstyle:IllegalCatch")
315     public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
316         final long stamp = notificationListenersLock.writeLock();
317         try {
318             removeAndCloseNotificationListenerTemplate(listener);
319         } catch (final Exception exception) {
320             LOG.error("Notification listener {} cannot be closed.", listener, exception);
321         } finally {
322             notificationListenersLock.unlockWrite(stamp);
323         }
324     }
325
326     /**
327      * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
328      * specified in parameter.
329      *
330      * @param listener Listener to be closed and removed.
331      */
332     @SuppressWarnings("checkstyle:IllegalCatch")
333     public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
334         final long stamp = deviceNotificationListenersLock.writeLock();
335         try {
336             requireNonNull(listener);
337             if (deviceNotificationListeners.inverse().remove(listener) == null) {
338                 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
339             }
340         } catch (final Exception exception) {
341             LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
342         } finally {
343             deviceNotificationListenersLock.unlockWrite(stamp);
344         }
345     }
346
347     private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
348         try {
349             requireNonNull(listener).close();
350             if (notificationListeners.inverse().remove(listener) == null) {
351                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
352             }
353         } catch (final InterruptedException | ExecutionException exception) {
354             LOG.error("Notification listener {} cannot be closed.", listener, exception);
355             throw new IllegalStateException(String.format(
356                     "Notification listener %s cannot be closed.", listener),
357                     exception);
358         }
359     }
360
361     /**
362      * Removal and closing of general listener (data-change or notification listener).
363      *
364      * @param listener Listener to be closed and removed from cache.
365      */
366     void removeAndCloseListener(final BaseListenerInterface listener) {
367         requireNonNull(listener);
368         if (listener instanceof ListenerAdapter) {
369             removeAndCloseDataChangeListener((ListenerAdapter) listener);
370         } else if (listener instanceof NotificationListenerAdapter) {
371             removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
372         }
373     }
374
375     /**
376      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
377      * and optionally {@link URLConstants#BASE_PATH} prefix.
378      *
379      * @param uri URI for creation of stream name.
380      * @return String representation of stream name.
381      */
382     public static String createStreamNameFromUri(final String uri) {
383         String result = requireNonNull(uri);
384         while (true) {
385             if (result.startsWith(URLConstants.BASE_PATH)) {
386                 result = result.substring(URLConstants.BASE_PATH.length());
387             } else if (result.startsWith("/")) {
388                 result = result.substring(1);
389             } else {
390                 break;
391             }
392         }
393         if (result.endsWith("/")) {
394             result = result.substring(0, result.length() - 1);
395         }
396         return result;
397     }
398 }