Netconf Device Notification
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.locks.StampedLock;
21 import java.util.function.Function;
22 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
23 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
24 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
25 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
28 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
29 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
35  * {@link NotificationListenerAdapter} listeners.
36  */
37 // FIXME: this should be a component
38 public final class ListenersBroker {
39     private static final class Holder {
40         // FIXME: remove this global singleton
41         static final ListenersBroker INSTANCE = new ListenersBroker();
42     }
43
44     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
45
46     private final StampedLock dataChangeListenersLock = new StampedLock();
47     private final StampedLock notificationListenersLock = new StampedLock();
48     private final StampedLock deviceNotificationListenersLock = new StampedLock();
49     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
50     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
51     private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
52
53
54     private ListenersBroker() {
55
56     }
57
58     /**
59      * Creation of the singleton listeners broker.
60      *
61      * @return Reusable instance of {@link ListenersBroker}.
62      */
63     public static ListenersBroker getInstance() {
64         return Holder.INSTANCE;
65     }
66
67     /**
68      * Returns set of all data-change-event streams.
69      */
70     public Set<String> getDataChangeStreams() {
71         final long stamp = dataChangeListenersLock.readLock();
72         try {
73             return ImmutableSet.copyOf(dataChangeListeners.keySet());
74         } finally {
75             dataChangeListenersLock.unlockRead(stamp);
76         }
77     }
78
79     /**
80      * Returns set of all notification streams.
81      */
82     public Set<String> getNotificationStreams() {
83         final long stamp = notificationListenersLock.readLock();
84         try {
85             return ImmutableSet.copyOf(notificationListeners.keySet());
86         } finally {
87             notificationListenersLock.unlockRead(stamp);
88         }
89     }
90
91     /**
92      * Gets {@link ListenerAdapter} specified by stream identification.
93      *
94      * @param streamName Stream name.
95      * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
96      *     if listener with specified stream name doesn't exist.
97      */
98     public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
99         final long stamp = dataChangeListenersLock.readLock();
100         try {
101             return Optional.ofNullable(dataChangeListeners.get(requireNonNull(streamName)));
102         } finally {
103             dataChangeListenersLock.unlockRead(stamp);
104         }
105     }
106
107     /**
108      * Gets {@link NotificationListenerAdapter} specified by stream name.
109      *
110      * @param streamName Stream name.
111      * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
112      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
113      */
114     public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
115         final long stamp = notificationListenersLock.readLock();
116         try {
117             return Optional.ofNullable(notificationListeners.get(requireNonNull(streamName)));
118         } finally {
119             notificationListenersLock.unlockRead(stamp);
120         }
121     }
122
123     /**
124      * Get listener for device path.
125      *
126      * @param path name.
127      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
128      *     or {@link Optional#empty()} if listener with specified path doesn't exist.
129      */
130     public Optional<BaseListenerInterface> getDeviceNotificationListenerFor(final String path) {
131         final long stamp = deviceNotificationListenersLock.readLock();
132         try {
133             return Optional.ofNullable(deviceNotificationListeners.get(requireNonNull(path)));
134         } finally {
135             deviceNotificationListenersLock.unlockRead(stamp);
136         }
137     }
138
139     /**
140      * Get listener for stream-name.
141      *
142      * @param streamName Stream name.
143      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
144      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
145      */
146     public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
147         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
148             return getNotificationListenerFor(streamName).map(Function.identity());
149         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
150             return getDataChangeListenerFor(streamName).map(Function.identity());
151         } else {
152             return Optional.empty();
153         }
154     }
155
156     /**
157      * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
158      * hasn't been created yet.
159      *
160      * @param path       Path to data in data repository.
161      * @param streamName Stream name.
162      * @param outputType Specific type of output for notifications - XML or JSON.
163      * @return Created or existing data-change listener adapter.
164      */
165     public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
166             final NotificationOutputType outputType) {
167         requireNonNull(path);
168         requireNonNull(streamName);
169         requireNonNull(outputType);
170
171         final long stamp = dataChangeListenersLock.writeLock();
172         try {
173             return dataChangeListeners.computeIfAbsent(streamName,
174                 stream -> new ListenerAdapter(path, stream, outputType));
175         } finally {
176             dataChangeListenersLock.unlockWrite(stamp);
177         }
178     }
179
180     /**
181      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
182      * if such listener haven't been created yet.
183      *
184      * @param schemaPath Schema path of YANG notification structure.
185      * @param streamName Stream name.
186      * @param outputType Specific type of output for notifications - XML or JSON.
187      * @return Created or existing notification listener adapter.
188      */
189     public NotificationListenerAdapter registerNotificationListener(final Absolute schemaPath,
190             final String streamName, final NotificationOutputType outputType) {
191         requireNonNull(schemaPath);
192         requireNonNull(streamName);
193         requireNonNull(outputType);
194
195         final long stamp = notificationListenersLock.writeLock();
196         try {
197             return notificationListeners.computeIfAbsent(streamName,
198                 stream -> new NotificationListenerAdapter(schemaPath, stream, outputType));
199         } finally {
200             notificationListenersLock.unlockWrite(stamp);
201         }
202     }
203
204     /**
205      * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
206      * if such listener haven't been created yet.
207      *
208      * @param streamName Stream name.
209      * @param outputType Specific type of output for notifications - XML or JSON.
210      * @param refSchemaCtx Schema context of node
211      * @param mountPointService Mount point service
212      * @return Created or existing device notification listener adapter.
213      */
214     public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
215         final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
216         final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
217
218         final long stamp = deviceNotificationListenersLock.writeLock();
219         try {
220             return deviceNotificationListeners.computeIfAbsent(streamName,
221                 stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
222                     mountPointService, path));
223         } finally {
224             deviceNotificationListenersLock.unlockWrite(stamp);
225         }
226     }
227
228     /**
229      * Removal and closing of all data-change-event and notification listeners.
230      */
231     public synchronized void removeAndCloseAllListeners() {
232         final long stampNotifications = notificationListenersLock.writeLock();
233         final long stampDataChanges = dataChangeListenersLock.writeLock();
234         try {
235             removeAndCloseAllDataChangeListenersTemplate();
236             removeAndCloseAllNotificationListenersTemplate();
237         } finally {
238             dataChangeListenersLock.unlockWrite(stampDataChanges);
239             notificationListenersLock.unlockWrite(stampNotifications);
240         }
241     }
242
243     /**
244      * Closes and removes all data-change listeners.
245      */
246     public void removeAndCloseAllDataChangeListeners() {
247         final long stamp = dataChangeListenersLock.writeLock();
248         try {
249             removeAndCloseAllDataChangeListenersTemplate();
250         } finally {
251             dataChangeListenersLock.unlockWrite(stamp);
252         }
253     }
254
255     @SuppressWarnings("checkstyle:IllegalCatch")
256     private void removeAndCloseAllDataChangeListenersTemplate() {
257         dataChangeListeners.values()
258                 .forEach(listenerAdapter -> {
259                     try {
260                         listenerAdapter.close();
261                     } catch (final Exception exception) {
262                         LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
263                         throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
264                                 listenerAdapter), exception);
265                     }
266                 });
267         dataChangeListeners.clear();
268     }
269
270     /**
271      * Closes and removes all notification listeners.
272      */
273     public void removeAndCloseAllNotificationListeners() {
274         final long stamp = notificationListenersLock.writeLock();
275         try {
276             removeAndCloseAllNotificationListenersTemplate();
277         } finally {
278             notificationListenersLock.unlockWrite(stamp);
279         }
280     }
281
282     @SuppressWarnings("checkstyle:IllegalCatch")
283     private void removeAndCloseAllNotificationListenersTemplate() {
284         notificationListeners.values()
285                 .forEach(listenerAdapter -> {
286                     try {
287                         listenerAdapter.close();
288                     } catch (final Exception exception) {
289                         LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
290                         throw new IllegalStateException(String.format("Failed to close notification listener %s.",
291                                 listenerAdapter), exception);
292                     }
293                 });
294         notificationListeners.clear();
295     }
296
297     /**
298      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
299      *
300      * @param listener Listener to be closed and removed.
301      */
302     @SuppressWarnings("checkstyle:IllegalCatch")
303     public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
304         final long stamp = dataChangeListenersLock.writeLock();
305         try {
306             removeAndCloseDataChangeListenerTemplate(listener);
307         } catch (final Exception exception) {
308             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
309         } finally {
310             dataChangeListenersLock.unlockWrite(stamp);
311         }
312     }
313
314     /**
315      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
316      *
317      * @param listener Listener to be closed and removed.
318      */
319     private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
320         try {
321             requireNonNull(listener).close();
322             if (dataChangeListeners.inverse().remove(listener) == null) {
323                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
324             }
325         } catch (final InterruptedException | ExecutionException exception) {
326             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
327             throw new IllegalStateException(String.format(
328                     "Data-change listener %s cannot be closed.",
329                     listener), exception);
330         }
331     }
332
333     /**
334      * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
335      *
336      * @param listener Listener to be closed and removed.
337      */
338     @SuppressWarnings("checkstyle:IllegalCatch")
339     public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
340         final long stamp = notificationListenersLock.writeLock();
341         try {
342             removeAndCloseNotificationListenerTemplate(listener);
343         } catch (final Exception exception) {
344             LOG.error("Notification listener {} cannot be closed.", listener, exception);
345         } finally {
346             notificationListenersLock.unlockWrite(stamp);
347         }
348     }
349
350     /**
351      * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
352      * specified in parameter.
353      *
354      * @param listener Listener to be closed and removed.
355      */
356     @SuppressWarnings("checkstyle:IllegalCatch")
357     public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
358         final long stamp = deviceNotificationListenersLock.writeLock();
359         try {
360             requireNonNull(listener);
361             if (deviceNotificationListeners.inverse().remove(listener) == null) {
362                 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
363             }
364         } catch (final Exception exception) {
365             LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
366         } finally {
367             deviceNotificationListenersLock.unlockWrite(stamp);
368         }
369     }
370
371     private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
372         try {
373             requireNonNull(listener).close();
374             if (notificationListeners.inverse().remove(listener) == null) {
375                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
376             }
377         } catch (final InterruptedException | ExecutionException exception) {
378             LOG.error("Notification listener {} cannot be closed.", listener, exception);
379             throw new IllegalStateException(String.format(
380                     "Notification listener %s cannot be closed.", listener),
381                     exception);
382         }
383     }
384
385     /**
386      * Removal and closing of general listener (data-change or notification listener).
387      *
388      * @param listener Listener to be closed and removed from cache.
389      */
390     void removeAndCloseListener(final BaseListenerInterface listener) {
391         requireNonNull(listener);
392         if (listener instanceof ListenerAdapter) {
393             removeAndCloseDataChangeListener((ListenerAdapter) listener);
394         } else if (listener instanceof NotificationListenerAdapter) {
395             removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
396         }
397     }
398
399     /**
400      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
401      * and optionally {@link RestconfConstants#BASE_URI_PATTERN} prefix.
402      *
403      * @param uri URI for creation of stream name.
404      * @return String representation of stream name.
405      */
406     public static String createStreamNameFromUri(final String uri) {
407         String result = requireNonNull(uri);
408         while (true) {
409             if (result.startsWith(RestconfConstants.BASE_URI_PATTERN)) {
410                 result = result.substring(RestconfConstants.BASE_URI_PATTERN.length());
411             } else if (result.startsWith("/")) {
412                 result = result.substring(1);
413             } else {
414                 break;
415             }
416         }
417         if (result.endsWith("/")) {
418             result = result.substring(0, result.length() - 1);
419         }
420         return result;
421     }
422
423     @VisibleForTesting
424     public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
425         final long stamp = dataChangeListenersLock.writeLock();
426         try {
427             dataChangeListeners.clear();
428             dataChangeListeners.putAll(listenerAdapterCollection);
429         } finally {
430             dataChangeListenersLock.unlockWrite(stamp);
431         }
432     }
433 }