Switch to using SseEventSink
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.collect.BiMap;
15 import com.google.common.collect.HashBiMap;
16 import com.google.common.collect.ImmutableSet;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.concurrent.locks.StampedLock;
21 import java.util.function.Function;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
23 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
24 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
27 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
33  * {@link NotificationListenerAdapter} listeners.
34  */
35 public final class ListenersBroker {
36     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
37     private static ListenersBroker listenersBroker;
38
39     private final StampedLock dataChangeListenersLock = new StampedLock();
40     private final StampedLock notificationListenersLock = new StampedLock();
41     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
42     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
43
44     private ListenersBroker() {
45     }
46
47     /**
48      * Creation of the singleton listeners broker.
49      *
50      * @return Reusable instance of {@link ListenersBroker}.
51      */
52     // FIXME: remove this global singleton
53     public static synchronized ListenersBroker getInstance() {
54         if (listenersBroker == null) {
55             listenersBroker = new ListenersBroker();
56         }
57         return listenersBroker;
58     }
59
60     /**
61      * Returns set of all data-change-event streams.
62      */
63     public Set<String> getDataChangeStreams() {
64         final long stamp = dataChangeListenersLock.readLock();
65         try {
66             return ImmutableSet.copyOf(dataChangeListeners.keySet());
67         } finally {
68             dataChangeListenersLock.unlockRead(stamp);
69         }
70     }
71
72     /**
73      * Returns set of all notification streams.
74      */
75     public Set<String> getNotificationStreams() {
76         final long stamp = notificationListenersLock.readLock();
77         try {
78             return ImmutableSet.copyOf(notificationListeners.keySet());
79         } finally {
80             notificationListenersLock.unlockRead(stamp);
81         }
82     }
83
84     /**
85      * Gets {@link ListenerAdapter} specified by stream identification.
86      *
87      * @param streamName Stream name.
88      * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
89      *     if listener with specified stream name doesn't exist.
90      */
91     public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
92         final long stamp = dataChangeListenersLock.readLock();
93         try {
94             final ListenerAdapter listenerAdapter = dataChangeListeners.get(requireNonNull(streamName));
95             return Optional.ofNullable(listenerAdapter);
96         } finally {
97             dataChangeListenersLock.unlockRead(stamp);
98         }
99     }
100
101     /**
102      * Gets {@link NotificationListenerAdapter} specified by stream name.
103      *
104      * @param streamName Stream name.
105      * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
106      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
107      */
108     public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
109         final long stamp = notificationListenersLock.readLock();
110         try {
111             final NotificationListenerAdapter listenerAdapter = notificationListeners.get(requireNonNull(streamName));
112             return Optional.ofNullable(listenerAdapter);
113         } finally {
114             notificationListenersLock.unlockRead(stamp);
115         }
116     }
117
118     /**
119      * Get listener for stream-name.
120      *
121      * @param streamName Stream name.
122      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
123      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
124      */
125     public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
126         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
127             return getNotificationListenerFor(streamName).map(Function.identity());
128         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
129             return getDataChangeListenerFor(streamName).map(Function.identity());
130         } else {
131             return Optional.empty();
132         }
133     }
134
135     /**
136      * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
137      * hasn't been created yet.
138      *
139      * @param path       Path to data in data repository.
140      * @param streamName Stream name.
141      * @param outputType Specific type of output for notifications - XML or JSON.
142      * @return Created or existing data-change listener adapter.
143      */
144     public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
145             final NotificationOutputType outputType) {
146         requireNonNull(path);
147         requireNonNull(streamName);
148         requireNonNull(outputType);
149
150         final long stamp = dataChangeListenersLock.writeLock();
151         try {
152             return dataChangeListeners.computeIfAbsent(streamName, stream -> new ListenerAdapter(
153                     path, stream, outputType));
154         } finally {
155             dataChangeListenersLock.unlockWrite(stamp);
156         }
157     }
158
159     /**
160      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
161      * if such listener haven't been created yet.
162      *
163      * @param schemaPath Schema path of YANG notification structure.
164      * @param streamName Stream name.
165      * @param outputType Specific type of output for notifications - XML or JSON.
166      * @return Created or existing notification listener adapter.
167      */
168     public NotificationListenerAdapter registerNotificationListener(final Absolute schemaPath,
169             final String streamName, final NotificationOutputType outputType) {
170         requireNonNull(schemaPath);
171         requireNonNull(streamName);
172         requireNonNull(outputType);
173
174         final long stamp = notificationListenersLock.writeLock();
175         try {
176             return notificationListeners.computeIfAbsent(streamName, stream -> new NotificationListenerAdapter(
177                     schemaPath, stream, outputType.getName()));
178         } finally {
179             notificationListenersLock.unlockWrite(stamp);
180         }
181     }
182
183     /**
184      * Removal and closing of all data-change-event and notification listeners.
185      */
186     public synchronized void removeAndCloseAllListeners() {
187         final long stampNotifications = notificationListenersLock.writeLock();
188         final long stampDataChanges = dataChangeListenersLock.writeLock();
189         try {
190             removeAndCloseAllDataChangeListenersTemplate();
191             removeAndCloseAllNotificationListenersTemplate();
192         } finally {
193             dataChangeListenersLock.unlockWrite(stampDataChanges);
194             notificationListenersLock.unlockWrite(stampNotifications);
195         }
196     }
197
198     /**
199      * Closes and removes all data-change listeners.
200      */
201     public void removeAndCloseAllDataChangeListeners() {
202         final long stamp = dataChangeListenersLock.writeLock();
203         try {
204             removeAndCloseAllDataChangeListenersTemplate();
205         } finally {
206             dataChangeListenersLock.unlockWrite(stamp);
207         }
208     }
209
210     @SuppressWarnings("checkstyle:IllegalCatch")
211     private void removeAndCloseAllDataChangeListenersTemplate() {
212         dataChangeListeners.values()
213                 .forEach(listenerAdapter -> {
214                     try {
215                         listenerAdapter.close();
216                     } catch (final Exception exception) {
217                         LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
218                         throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
219                                 listenerAdapter), exception);
220                     }
221                 });
222         dataChangeListeners.clear();
223     }
224
225     /**
226      * Closes and removes all notification listeners.
227      */
228     public void removeAndCloseAllNotificationListeners() {
229         final long stamp = notificationListenersLock.writeLock();
230         try {
231             removeAndCloseAllNotificationListenersTemplate();
232         } finally {
233             notificationListenersLock.unlockWrite(stamp);
234         }
235     }
236
237     @SuppressWarnings("checkstyle:IllegalCatch")
238     private void removeAndCloseAllNotificationListenersTemplate() {
239         notificationListeners.values()
240                 .forEach(listenerAdapter -> {
241                     try {
242                         listenerAdapter.close();
243                     } catch (final Exception exception) {
244                         LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
245                         throw new IllegalStateException(String.format("Failed to close notification listener %s.",
246                                 listenerAdapter), exception);
247                     }
248                 });
249         notificationListeners.clear();
250     }
251
252     /**
253      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
254      *
255      * @param listener Listener to be closed and removed.
256      */
257     @SuppressWarnings("checkstyle:IllegalCatch")
258     public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
259         final long stamp = dataChangeListenersLock.writeLock();
260         try {
261             removeAndCloseDataChangeListenerTemplate(listener);
262         } catch (final Exception exception) {
263             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
264         } finally {
265             dataChangeListenersLock.unlockWrite(stamp);
266         }
267     }
268
269     /**
270      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
271      *
272      * @param listener Listener to be closed and removed.
273      */
274     @SuppressWarnings("checkstyle:IllegalCatch")
275     private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
276         try {
277             requireNonNull(listener).close();
278             if (dataChangeListeners.inverse().remove(listener) == null) {
279                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
280             }
281         } catch (final Exception exception) {
282             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
283             throw new IllegalStateException(String.format(
284                     "Data-change listener %s cannot be closed.",
285                     listener), exception);
286         }
287     }
288
289     /**
290      * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
291      *
292      * @param listener Listener to be closed and removed.
293      */
294     @SuppressWarnings("checkstyle:IllegalCatch")
295     public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
296         final long stamp = notificationListenersLock.writeLock();
297         try {
298             removeAndCloseNotificationListenerTemplate(listener);
299         } catch (final Exception exception) {
300             LOG.error("Notification listener {} cannot be closed.", listener, exception);
301         } finally {
302             notificationListenersLock.unlockWrite(stamp);
303         }
304     }
305
306     @SuppressWarnings({"checkstyle:IllegalCatch"})
307     private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
308         try {
309             requireNonNull(listener).close();
310             if (notificationListeners.inverse().remove(listener) == null) {
311                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
312             }
313         } catch (final Exception exception) {
314             LOG.error("Notification listener {} cannot be closed.", listener, exception);
315             throw new IllegalStateException(String.format(
316                     "Notification listener %s cannot be closed.", listener),
317                     exception);
318         }
319     }
320
321     /**
322      * Removal and closing of general listener (data-change or notification listener).
323      *
324      * @param listener Listener to be closed and removed from cache.
325      */
326     void removeAndCloseListener(final BaseListenerInterface listener) {
327         requireNonNull(listener);
328         if (listener instanceof ListenerAdapter) {
329             removeAndCloseDataChangeListener((ListenerAdapter) listener);
330         } else if (listener instanceof NotificationListenerAdapter) {
331             removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
332         }
333     }
334
335     /**
336      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
337      * and optionally {@link RestconfConstants#BASE_URI_PATTERN} prefix.
338      *
339      * @param uri URI for creation of stream name.
340      * @return String representation of stream name.
341      */
342     public static String createStreamNameFromUri(final String uri) {
343         String result = requireNonNull(uri);
344         while (true) {
345             if (result.startsWith(RestconfConstants.BASE_URI_PATTERN)) {
346                 result = result.substring(RestconfConstants.BASE_URI_PATTERN.length());
347             } else if (result.startsWith("/")) {
348                 result = result.substring(1);
349             } else {
350                 break;
351             }
352         }
353         if (result.endsWith("/")) {
354             result = result.substring(0, result.length() - 1);
355         }
356         return result;
357     }
358
359     @VisibleForTesting
360     public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
361         final long stamp = dataChangeListenersLock.writeLock();
362         try {
363             dataChangeListeners.clear();
364             dataChangeListeners.putAll(listenerAdapterCollection);
365         } finally {
366             dataChangeListenersLock.unlockWrite(stamp);
367         }
368     }
369 }