3b83c9022512c9e3817eb4a8ecafb17a35e6aa5b
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.locks.StampedLock;
21 import java.util.function.Function;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
23 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
24 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
27 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
33  * {@link NotificationListenerAdapter} listeners.
34  */
35 // FIXME: this should be a component
36 public final class ListenersBroker {
37     private static final class Holder {
38         // FIXME: remove this global singleton
39         static final ListenersBroker INSTANCE = new ListenersBroker();
40     }
41
42     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
43
44     private final StampedLock dataChangeListenersLock = new StampedLock();
45     private final StampedLock notificationListenersLock = new StampedLock();
46     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
47     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
48
49     private ListenersBroker() {
50
51     }
52
53     /**
54      * Creation of the singleton listeners broker.
55      *
56      * @return Reusable instance of {@link ListenersBroker}.
57      */
58     public static ListenersBroker getInstance() {
59         return Holder.INSTANCE;
60     }
61
62     /**
63      * Returns set of all data-change-event streams.
64      */
65     public Set<String> getDataChangeStreams() {
66         final long stamp = dataChangeListenersLock.readLock();
67         try {
68             return ImmutableSet.copyOf(dataChangeListeners.keySet());
69         } finally {
70             dataChangeListenersLock.unlockRead(stamp);
71         }
72     }
73
74     /**
75      * Returns set of all notification streams.
76      */
77     public Set<String> getNotificationStreams() {
78         final long stamp = notificationListenersLock.readLock();
79         try {
80             return ImmutableSet.copyOf(notificationListeners.keySet());
81         } finally {
82             notificationListenersLock.unlockRead(stamp);
83         }
84     }
85
86     /**
87      * Gets {@link ListenerAdapter} specified by stream identification.
88      *
89      * @param streamName Stream name.
90      * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
91      *     if listener with specified stream name doesn't exist.
92      */
93     public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
94         final long stamp = dataChangeListenersLock.readLock();
95         try {
96             return Optional.ofNullable(dataChangeListeners.get(requireNonNull(streamName)));
97         } finally {
98             dataChangeListenersLock.unlockRead(stamp);
99         }
100     }
101
102     /**
103      * Gets {@link NotificationListenerAdapter} specified by stream name.
104      *
105      * @param streamName Stream name.
106      * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
107      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
108      */
109     public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
110         final long stamp = notificationListenersLock.readLock();
111         try {
112             return Optional.ofNullable(notificationListeners.get(requireNonNull(streamName)));
113         } finally {
114             notificationListenersLock.unlockRead(stamp);
115         }
116     }
117
118     /**
119      * Get listener for stream-name.
120      *
121      * @param streamName Stream name.
122      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
123      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
124      */
125     public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
126         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
127             return getNotificationListenerFor(streamName).map(Function.identity());
128         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
129             return getDataChangeListenerFor(streamName).map(Function.identity());
130         } else {
131             return Optional.empty();
132         }
133     }
134
135     /**
136      * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
137      * hasn't been created yet.
138      *
139      * @param path       Path to data in data repository.
140      * @param streamName Stream name.
141      * @param outputType Specific type of output for notifications - XML or JSON.
142      * @return Created or existing data-change listener adapter.
143      */
144     public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
145             final NotificationOutputType outputType) {
146         requireNonNull(path);
147         requireNonNull(streamName);
148         requireNonNull(outputType);
149
150         final long stamp = dataChangeListenersLock.writeLock();
151         try {
152             return dataChangeListeners.computeIfAbsent(streamName,
153                 stream -> new ListenerAdapter(path, stream, outputType));
154         } finally {
155             dataChangeListenersLock.unlockWrite(stamp);
156         }
157     }
158
159     /**
160      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
161      * if such listener haven't been created yet.
162      *
163      * @param schemaPath Schema path of YANG notification structure.
164      * @param streamName Stream name.
165      * @param outputType Specific type of output for notifications - XML or JSON.
166      * @return Created or existing notification listener adapter.
167      */
168     public NotificationListenerAdapter registerNotificationListener(final Absolute schemaPath,
169             final String streamName, final NotificationOutputType outputType) {
170         requireNonNull(schemaPath);
171         requireNonNull(streamName);
172         requireNonNull(outputType);
173
174         final long stamp = notificationListenersLock.writeLock();
175         try {
176             return notificationListeners.computeIfAbsent(streamName,
177                 stream -> new NotificationListenerAdapter(schemaPath, stream, outputType));
178         } finally {
179             notificationListenersLock.unlockWrite(stamp);
180         }
181     }
182
183     /**
184      * Removal and closing of all data-change-event and notification listeners.
185      */
186     public synchronized void removeAndCloseAllListeners() {
187         final long stampNotifications = notificationListenersLock.writeLock();
188         final long stampDataChanges = dataChangeListenersLock.writeLock();
189         try {
190             removeAndCloseAllDataChangeListenersTemplate();
191             removeAndCloseAllNotificationListenersTemplate();
192         } finally {
193             dataChangeListenersLock.unlockWrite(stampDataChanges);
194             notificationListenersLock.unlockWrite(stampNotifications);
195         }
196     }
197
198     /**
199      * Closes and removes all data-change listeners.
200      */
201     public void removeAndCloseAllDataChangeListeners() {
202         final long stamp = dataChangeListenersLock.writeLock();
203         try {
204             removeAndCloseAllDataChangeListenersTemplate();
205         } finally {
206             dataChangeListenersLock.unlockWrite(stamp);
207         }
208     }
209
210     @SuppressWarnings("checkstyle:IllegalCatch")
211     private void removeAndCloseAllDataChangeListenersTemplate() {
212         dataChangeListeners.values()
213                 .forEach(listenerAdapter -> {
214                     try {
215                         listenerAdapter.close();
216                     } catch (final Exception exception) {
217                         LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
218                         throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
219                                 listenerAdapter), exception);
220                     }
221                 });
222         dataChangeListeners.clear();
223     }
224
225     /**
226      * Closes and removes all notification listeners.
227      */
228     public void removeAndCloseAllNotificationListeners() {
229         final long stamp = notificationListenersLock.writeLock();
230         try {
231             removeAndCloseAllNotificationListenersTemplate();
232         } finally {
233             notificationListenersLock.unlockWrite(stamp);
234         }
235     }
236
237     @SuppressWarnings("checkstyle:IllegalCatch")
238     private void removeAndCloseAllNotificationListenersTemplate() {
239         notificationListeners.values()
240                 .forEach(listenerAdapter -> {
241                     try {
242                         listenerAdapter.close();
243                     } catch (final Exception exception) {
244                         LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
245                         throw new IllegalStateException(String.format("Failed to close notification listener %s.",
246                                 listenerAdapter), exception);
247                     }
248                 });
249         notificationListeners.clear();
250     }
251
252     /**
253      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
254      *
255      * @param listener Listener to be closed and removed.
256      */
257     @SuppressWarnings("checkstyle:IllegalCatch")
258     public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
259         final long stamp = dataChangeListenersLock.writeLock();
260         try {
261             removeAndCloseDataChangeListenerTemplate(listener);
262         } catch (final Exception exception) {
263             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
264         } finally {
265             dataChangeListenersLock.unlockWrite(stamp);
266         }
267     }
268
269     /**
270      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
271      *
272      * @param listener Listener to be closed and removed.
273      */
274     private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
275         try {
276             requireNonNull(listener).close();
277             if (dataChangeListeners.inverse().remove(listener) == null) {
278                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
279             }
280         } catch (final InterruptedException | ExecutionException exception) {
281             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
282             throw new IllegalStateException(String.format(
283                     "Data-change listener %s cannot be closed.",
284                     listener), exception);
285         }
286     }
287
288     /**
289      * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
290      *
291      * @param listener Listener to be closed and removed.
292      */
293     @SuppressWarnings("checkstyle:IllegalCatch")
294     public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
295         final long stamp = notificationListenersLock.writeLock();
296         try {
297             removeAndCloseNotificationListenerTemplate(listener);
298         } catch (final Exception exception) {
299             LOG.error("Notification listener {} cannot be closed.", listener, exception);
300         } finally {
301             notificationListenersLock.unlockWrite(stamp);
302         }
303     }
304
305     private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
306         try {
307             requireNonNull(listener).close();
308             if (notificationListeners.inverse().remove(listener) == null) {
309                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
310             }
311         } catch (final InterruptedException | ExecutionException exception) {
312             LOG.error("Notification listener {} cannot be closed.", listener, exception);
313             throw new IllegalStateException(String.format(
314                     "Notification listener %s cannot be closed.", listener),
315                     exception);
316         }
317     }
318
319     /**
320      * Removal and closing of general listener (data-change or notification listener).
321      *
322      * @param listener Listener to be closed and removed from cache.
323      */
324     void removeAndCloseListener(final BaseListenerInterface listener) {
325         requireNonNull(listener);
326         if (listener instanceof ListenerAdapter) {
327             removeAndCloseDataChangeListener((ListenerAdapter) listener);
328         } else if (listener instanceof NotificationListenerAdapter) {
329             removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
330         }
331     }
332
333     /**
334      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
335      * and optionally {@link RestconfConstants#BASE_URI_PATTERN} prefix.
336      *
337      * @param uri URI for creation of stream name.
338      * @return String representation of stream name.
339      */
340     public static String createStreamNameFromUri(final String uri) {
341         String result = requireNonNull(uri);
342         while (true) {
343             if (result.startsWith(RestconfConstants.BASE_URI_PATTERN)) {
344                 result = result.substring(RestconfConstants.BASE_URI_PATTERN.length());
345             } else if (result.startsWith("/")) {
346                 result = result.substring(1);
347             } else {
348                 break;
349             }
350         }
351         if (result.endsWith("/")) {
352             result = result.substring(0, result.length() - 1);
353         }
354         return result;
355     }
356
357     @VisibleForTesting
358     public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
359         final long stamp = dataChangeListenersLock.writeLock();
360         try {
361             dataChangeListeners.clear();
362             dataChangeListeners.putAll(listenerAdapterCollection);
363         } finally {
364             dataChangeListenersLock.unlockWrite(stamp);
365         }
366     }
367 }