d1dadcf22e7eea547db73b0baf0b0deb65f09ef3
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.collect.BiMap;
15 import com.google.common.collect.HashBiMap;
16 import com.google.common.collect.ImmutableSet;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.concurrent.locks.StampedLock;
21 import java.util.function.Function;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
23 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
24 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
27 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
33  * {@link NotificationListenerAdapter} listeners.
34  */
35 public final class ListenersBroker {
36     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
37     private static ListenersBroker listenersBroker;
38
39     private final StampedLock dataChangeListenersLock = new StampedLock();
40     private final StampedLock notificationListenersLock = new StampedLock();
41     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
42     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
43
44     private ListenersBroker() {
45     }
46
47     /**
48      * Creation of the singleton listeners broker.
49      *
50      * @return Reusable instance of {@link ListenersBroker}.
51      */
52     public static synchronized ListenersBroker getInstance() {
53         if (listenersBroker == null) {
54             listenersBroker = new ListenersBroker();
55         }
56         return listenersBroker;
57     }
58
59     /**
60      * Returns set of all data-change-event streams.
61      */
62     public Set<String> getDataChangeStreams() {
63         final long stamp = dataChangeListenersLock.readLock();
64         try {
65             return ImmutableSet.copyOf(dataChangeListeners.keySet());
66         } finally {
67             dataChangeListenersLock.unlockRead(stamp);
68         }
69     }
70
71     /**
72      * Returns set of all notification streams.
73      */
74     public Set<String> getNotificationStreams() {
75         final long stamp = notificationListenersLock.readLock();
76         try {
77             return ImmutableSet.copyOf(notificationListeners.keySet());
78         } finally {
79             notificationListenersLock.unlockRead(stamp);
80         }
81     }
82
83     /**
84      * Gets {@link ListenerAdapter} specified by stream identification.
85      *
86      * @param streamName Stream name.
87      * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
88      *     if listener with specified stream name doesn't exist.
89      */
90     public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
91         final long stamp = dataChangeListenersLock.readLock();
92         try {
93             final ListenerAdapter listenerAdapter = dataChangeListeners.get(requireNonNull(streamName));
94             return Optional.ofNullable(listenerAdapter);
95         } finally {
96             dataChangeListenersLock.unlockRead(stamp);
97         }
98     }
99
100     /**
101      * Gets {@link NotificationListenerAdapter} specified by stream name.
102      *
103      * @param streamName Stream name.
104      * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
105      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
106      */
107     public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
108         final long stamp = notificationListenersLock.readLock();
109         try {
110             final NotificationListenerAdapter listenerAdapter = notificationListeners.get(requireNonNull(streamName));
111             return Optional.ofNullable(listenerAdapter);
112         } finally {
113             notificationListenersLock.unlockRead(stamp);
114         }
115     }
116
117     /**
118      * Get listener for stream-name.
119      *
120      * @param streamName Stream name.
121      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
122      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
123      */
124     public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
125         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
126             return getNotificationListenerFor(streamName).map(Function.identity());
127         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
128             return getDataChangeListenerFor(streamName).map(Function.identity());
129         } else {
130             return Optional.empty();
131         }
132     }
133
134     /**
135      * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
136      * hasn't been created yet.
137      *
138      * @param path       Path to data in data repository.
139      * @param streamName Stream name.
140      * @param outputType Specific type of output for notifications - XML or JSON.
141      * @return Created or existing data-change listener adapter.
142      */
143     public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
144             final NotificationOutputType outputType) {
145         requireNonNull(path);
146         requireNonNull(streamName);
147         requireNonNull(outputType);
148
149         final long stamp = dataChangeListenersLock.writeLock();
150         try {
151             return dataChangeListeners.computeIfAbsent(streamName, stream -> new ListenerAdapter(
152                     path, stream, outputType));
153         } finally {
154             dataChangeListenersLock.unlockWrite(stamp);
155         }
156     }
157
158     /**
159      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
160      * if such listener haven't been created yet.
161      *
162      * @param schemaPath Schema path of YANG notification structure.
163      * @param streamName Stream name.
164      * @param outputType Specific type of output for notifications - XML or JSON.
165      * @return Created or existing notification listener adapter.
166      */
167     public NotificationListenerAdapter registerNotificationListener(final Absolute schemaPath,
168             final String streamName, final NotificationOutputType outputType) {
169         requireNonNull(schemaPath);
170         requireNonNull(streamName);
171         requireNonNull(outputType);
172
173         final long stamp = notificationListenersLock.writeLock();
174         try {
175             return notificationListeners.computeIfAbsent(streamName, stream -> new NotificationListenerAdapter(
176                     schemaPath, stream, outputType.getName()));
177         } finally {
178             notificationListenersLock.unlockWrite(stamp);
179         }
180     }
181
182     /**
183      * Removal and closing of all data-change-event and notification listeners.
184      */
185     public synchronized void removeAndCloseAllListeners() {
186         final long stampNotifications = notificationListenersLock.writeLock();
187         final long stampDataChanges = dataChangeListenersLock.writeLock();
188         try {
189             removeAndCloseAllDataChangeListenersTemplate();
190             removeAndCloseAllNotificationListenersTemplate();
191         } finally {
192             dataChangeListenersLock.unlockWrite(stampDataChanges);
193             notificationListenersLock.unlockWrite(stampNotifications);
194         }
195     }
196
197     /**
198      * Closes and removes all data-change listeners.
199      */
200     public void removeAndCloseAllDataChangeListeners() {
201         final long stamp = dataChangeListenersLock.writeLock();
202         try {
203             removeAndCloseAllDataChangeListenersTemplate();
204         } finally {
205             dataChangeListenersLock.unlockWrite(stamp);
206         }
207     }
208
209     @SuppressWarnings("checkstyle:IllegalCatch")
210     private void removeAndCloseAllDataChangeListenersTemplate() {
211         dataChangeListeners.values()
212                 .forEach(listenerAdapter -> {
213                     try {
214                         listenerAdapter.close();
215                     } catch (final Exception exception) {
216                         LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
217                         throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
218                                 listenerAdapter), exception);
219                     }
220                 });
221         dataChangeListeners.clear();
222     }
223
224     /**
225      * Closes and removes all notification listeners.
226      */
227     public void removeAndCloseAllNotificationListeners() {
228         final long stamp = notificationListenersLock.writeLock();
229         try {
230             removeAndCloseAllNotificationListenersTemplate();
231         } finally {
232             notificationListenersLock.unlockWrite(stamp);
233         }
234     }
235
236     @SuppressWarnings("checkstyle:IllegalCatch")
237     private void removeAndCloseAllNotificationListenersTemplate() {
238         notificationListeners.values()
239                 .forEach(listenerAdapter -> {
240                     try {
241                         listenerAdapter.close();
242                     } catch (final Exception exception) {
243                         LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
244                         throw new IllegalStateException(String.format("Failed to close notification listener %s.",
245                                 listenerAdapter), exception);
246                     }
247                 });
248         notificationListeners.clear();
249     }
250
251     /**
252      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
253      *
254      * @param listener Listener to be closed and removed.
255      */
256     @SuppressWarnings("checkstyle:IllegalCatch")
257     public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
258         final long stamp = dataChangeListenersLock.writeLock();
259         try {
260             removeAndCloseDataChangeListenerTemplate(listener);
261         } catch (final Exception exception) {
262             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
263         } finally {
264             dataChangeListenersLock.unlockWrite(stamp);
265         }
266     }
267
268     /**
269      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
270      *
271      * @param listener Listener to be closed and removed.
272      */
273     @SuppressWarnings("checkstyle:IllegalCatch")
274     private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
275         try {
276             requireNonNull(listener).close();
277             if (dataChangeListeners.inverse().remove(listener) == null) {
278                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
279             }
280         } catch (final Exception exception) {
281             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
282             throw new IllegalStateException(String.format(
283                     "Data-change listener %s cannot be closed.",
284                     listener), exception);
285         }
286     }
287
288     /**
289      * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
290      *
291      * @param listener Listener to be closed and removed.
292      */
293     @SuppressWarnings("checkstyle:IllegalCatch")
294     public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
295         final long stamp = notificationListenersLock.writeLock();
296         try {
297             removeAndCloseNotificationListenerTemplate(listener);
298         } catch (final Exception exception) {
299             LOG.error("Notification listener {} cannot be closed.", listener, exception);
300         } finally {
301             notificationListenersLock.unlockWrite(stamp);
302         }
303     }
304
305     @SuppressWarnings({"checkstyle:IllegalCatch"})
306     private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) {
307         try {
308             requireNonNull(listener).close();
309             if (notificationListeners.inverse().remove(listener) == null) {
310                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
311             }
312         } catch (final Exception exception) {
313             LOG.error("Notification listener {} cannot be closed.", listener, exception);
314             throw new IllegalStateException(String.format(
315                     "Notification listener %s cannot be closed.", listener),
316                     exception);
317         }
318     }
319
320     /**
321      * Removal and closing of general listener (data-change or notification listener).
322      *
323      * @param listener Listener to be closed and removed from cache.
324      */
325     void removeAndCloseListener(final BaseListenerInterface listener) {
326         requireNonNull(listener);
327         if (listener instanceof ListenerAdapter) {
328             removeAndCloseDataChangeListener((ListenerAdapter) listener);
329         } else if (listener instanceof NotificationListenerAdapter) {
330             removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
331         }
332     }
333
334     /**
335      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
336      * and optionally {@link RestconfConstants#BASE_URI_PATTERN} prefix.
337      *
338      * @param uri URI for creation of stream name.
339      * @return String representation of stream name.
340      */
341     public static String createStreamNameFromUri(final String uri) {
342         String result = requireNonNull(uri);
343         while (true) {
344             if (result.startsWith(RestconfConstants.BASE_URI_PATTERN)) {
345                 result = result.substring(RestconfConstants.BASE_URI_PATTERN.length());
346             } else if (result.startsWith("/")) {
347                 result = result.substring(1);
348             } else {
349                 break;
350             }
351         }
352         if (result.endsWith("/")) {
353             result = result.substring(0, result.length() - 1);
354         }
355         return result;
356     }
357
358     @VisibleForTesting
359     public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
360         final long stamp = dataChangeListenersLock.writeLock();
361         try {
362             dataChangeListeners.clear();
363             dataChangeListeners.putAll(listenerAdapterCollection);
364         } finally {
365             dataChangeListenersLock.unlockWrite(stamp);
366         }
367     }
368 }