Refactoring of web-sockets in RESTCONF RFC-8040
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.collect.BiMap;
15 import com.google.common.collect.HashBiMap;
16 import com.google.common.collect.ImmutableSet;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.concurrent.locks.StampedLock;
21 import java.util.function.Function;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
23 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
26 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
32  * {@link NotificationListenerAdapter} listeners.
33  */
34 public final class ListenersBroker {
35     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
36     private static ListenersBroker listenersBroker;
37
38     private final StampedLock dataChangeListenersLock = new StampedLock();
39     private final StampedLock notificationListenersLock = new StampedLock();
40     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
41     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
42
43     private ListenersBroker() {
44     }
45
46     /**
47      * Creation of the singleton listeners broker.
48      *
49      * @return Reusable instance of {@link ListenersBroker}.
50      */
51     public static synchronized ListenersBroker getInstance() {
52         if (listenersBroker == null) {
53             listenersBroker = new ListenersBroker();
54         }
55         return listenersBroker;
56     }
57
58     /**
59      * Returns set of all data-change-event streams.
60      */
61     public Set<String> getDataChangeStreams() {
62         final long stamp = dataChangeListenersLock.readLock();
63         try {
64             return ImmutableSet.copyOf(dataChangeListeners.keySet());
65         } finally {
66             dataChangeListenersLock.unlockRead(stamp);
67         }
68     }
69
70     /**
71      * Returns set of all notification streams.
72      */
73     public Set<String> getNotificationStreams() {
74         final long stamp = notificationListenersLock.readLock();
75         try {
76             return ImmutableSet.copyOf(notificationListeners.keySet());
77         } finally {
78             notificationListenersLock.unlockRead(stamp);
79         }
80     }
81
82     /**
83      * Gets {@link ListenerAdapter} specified by stream identification.
84      *
85      * @param streamName Stream name.
86      * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
87      *     if listener with specified stream name doesn't exist.
88      */
89     public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
90         final long stamp = dataChangeListenersLock.readLock();
91         try {
92             final ListenerAdapter listenerAdapter = dataChangeListeners.get(requireNonNull(streamName));
93             return Optional.ofNullable(listenerAdapter);
94         } finally {
95             dataChangeListenersLock.unlockRead(stamp);
96         }
97     }
98
99     /**
100      * Gets {@link NotificationListenerAdapter} specified by stream name.
101      *
102      * @param streamName Stream name.
103      * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
104      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
105      */
106     public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
107         final long stamp = notificationListenersLock.readLock();
108         try {
109             final NotificationListenerAdapter listenerAdapter = notificationListeners.get(requireNonNull(streamName));
110             return Optional.ofNullable(listenerAdapter);
111         } finally {
112             notificationListenersLock.unlockRead(stamp);
113         }
114     }
115
116     /**
117      * Get listener for stream-name.
118      *
119      * @param streamName Stream name.
120      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
121      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
122      */
123     public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
124         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
125             return getNotificationListenerFor(streamName).map(Function.identity());
126         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
127             return getDataChangeListenerFor(streamName).map(Function.identity());
128         } else {
129             return Optional.empty();
130         }
131     }
132
133     /**
134      * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
135      * hasn't been created yet.
136      *
137      * @param path       Path to data in data repository.
138      * @param streamName Stream name.
139      * @param outputType Specific type of output for notifications - XML or JSON.
140      * @return Created or existing data-change listener adapter.
141      */
142     public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
143             final NotificationOutputType outputType) {
144         requireNonNull(path);
145         requireNonNull(streamName);
146         requireNonNull(outputType);
147
148         final long stamp = dataChangeListenersLock.writeLock();
149         try {
150             return dataChangeListeners.computeIfAbsent(streamName, stream -> new ListenerAdapter(
151                     path, stream, outputType));
152         } finally {
153             dataChangeListenersLock.unlockWrite(stamp);
154         }
155     }
156
157     /**
158      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
159      * if such listener haven't been created yet.
160      *
161      * @param schemaPath Schema path of YANG notification structure.
162      * @param streamName Stream name.
163      * @param outputType Specific type of output for notifications - XML or JSON.
164      * @return Created or existing notification listener adapter.
165      */
166     public NotificationListenerAdapter registerNotificationListener(final SchemaPath schemaPath,
167             final String streamName, final NotificationOutputType outputType) {
168         requireNonNull(schemaPath);
169         requireNonNull(streamName);
170         requireNonNull(outputType);
171
172         final long stamp = notificationListenersLock.writeLock();
173         try {
174             return notificationListeners.computeIfAbsent(streamName, stream -> new NotificationListenerAdapter(
175                     schemaPath, stream, outputType.getName()));
176         } finally {
177             notificationListenersLock.unlockWrite(stamp);
178         }
179     }
180
181     /**
182      * Removal and closing of all data-change-event and notification listeners.
183      */
184     public synchronized void removeAndCloseAllListeners() {
185         final long stampNotifications = notificationListenersLock.writeLock();
186         final long stampDataChanges = dataChangeListenersLock.writeLock();
187         try {
188             removeAndCloseAllDataChangeListenersTemplate();
189             removeAndCloseAllNotificationListenersTemplate();
190         } finally {
191             dataChangeListenersLock.unlockWrite(stampDataChanges);
192             notificationListenersLock.unlockWrite(stampNotifications);
193         }
194     }
195
196     /**
197      * Closes and removes all data-change listeners.
198      */
199     public void removeAndCloseAllDataChangeListeners() {
200         final long stamp = dataChangeListenersLock.writeLock();
201         try {
202             removeAndCloseAllDataChangeListenersTemplate();
203         } finally {
204             dataChangeListenersLock.unlockWrite(stamp);
205         }
206     }
207
208     @SuppressWarnings("checkstyle:IllegalCatch")
209     private void removeAndCloseAllDataChangeListenersTemplate() {
210         dataChangeListeners.values()
211                 .forEach(listenerAdapter -> {
212                     try {
213                         listenerAdapter.close();
214                     } catch (final Exception exception) {
215                         LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
216                         throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
217                                 listenerAdapter), exception);
218                     }
219                 });
220         dataChangeListeners.clear();
221     }
222
223     /**
224      * Closes and removes all notification listeners.
225      */
226     public void removeAndCloseAllNotificationListeners() {
227         final long stamp = notificationListenersLock.writeLock();
228         try {
229             removeAndCloseAllNotificationListenersTemplate();
230         } finally {
231             notificationListenersLock.unlockWrite(stamp);
232         }
233     }
234
235     @SuppressWarnings("checkstyle:IllegalCatch")
236     private void removeAndCloseAllNotificationListenersTemplate() {
237         notificationListeners.values()
238                 .forEach(listenerAdapter -> {
239                     try {
240                         listenerAdapter.close();
241                     } catch (final Exception exception) {
242                         LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
243                         throw new IllegalStateException(String.format("Failed to close notification listener %s.",
244                                 listenerAdapter), exception);
245                     }
246                 });
247         notificationListeners.clear();
248     }
249
250     /**
251      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
252      *
253      * @param listener Listener to be closed and removed.
254      */
255     @SuppressWarnings("checkstyle:IllegalCatch")
256     public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
257         final long stamp = dataChangeListenersLock.writeLock();
258         try {
259             removeAndCloseDataChangeListenerTemplate(listener);
260         } catch (final Exception exception) {
261             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
262         } finally {
263             dataChangeListenersLock.unlockWrite(stamp);
264         }
265     }
266
267     /**
268      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
269      *
270      * @param listener Listener to be closed and removed.
271      */
272     @SuppressWarnings("checkstyle:IllegalCatch")
273     private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
274         final long stamp = dataChangeListenersLock.writeLock();
275         try {
276             requireNonNull(listener).close();
277             if (dataChangeListeners.inverse().remove(listener) == null) {
278                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
279             }
280         } catch (final Exception exception) {
281             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
282             throw new IllegalStateException(String.format(
283                     "Data-change listener %s cannot be closed.",
284                     listener), exception);
285         } finally {
286             dataChangeListenersLock.unlockWrite(stamp);
287         }
288     }
289
290     /**
291      * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
292      *
293      * @param listener Listener to be closed and removed.
294      */
295     @SuppressWarnings("checkstyle:IllegalCatch")
296     public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
297         final long stamp = notificationListenersLock.writeLock();
298         try {
299             removeAndCloseNotificationListenerTemplate(listener);
300         } catch (final Exception exception) {
301             LOG.error("Notification listener {} cannot be closed.", listener, exception);
302         } finally {
303             notificationListenersLock.unlockWrite(stamp);
304         }
305     }
306
307     @SuppressWarnings({"checkstyle:IllegalCatch"})
308     private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) {
309         try {
310             requireNonNull(listener).close();
311             if (notificationListeners.inverse().remove(listener) == null) {
312                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
313             }
314         } catch (final Exception exception) {
315             LOG.error("Notification listener {} cannot be closed.", listener, exception);
316             throw new IllegalStateException(String.format(
317                     "Notification listener %s cannot be closed.", listener),
318                     exception);
319         }
320     }
321
322     /**
323      * Removal and closing of general listener (data-change or notification listener).
324      *
325      * @param listener Listener to be closed and removed from cache.
326      */
327     void removeAndCloseListener(final BaseListenerInterface listener) {
328         requireNonNull(listener);
329         if (listener instanceof ListenerAdapter) {
330             removeAndCloseDataChangeListener((ListenerAdapter) listener);
331         } else if (listener instanceof NotificationListenerAdapter) {
332             removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
333         }
334     }
335
336     /**
337      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions.
338      *
339      * @param uri URI for creation of stream name.
340      * @return String representation of stream name.
341      */
342     public static String createStreamNameFromUri(final String uri) {
343         String result = requireNonNull(uri);
344         if (result.startsWith("/")) {
345             result = result.substring(1);
346         }
347         if (result.endsWith("/")) {
348             result = result.substring(0, result.length() - 1);
349         }
350         return result;
351     }
352
353     @VisibleForTesting
354     public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
355         final long stamp = dataChangeListenersLock.writeLock();
356         try {
357             dataChangeListeners.clear();
358             dataChangeListeners.putAll(listenerAdapterCollection);
359         } finally {
360             dataChangeListenersLock.unlockWrite(stamp);
361         }
362     }
363 }