2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.collect.BiMap;
15 import com.google.common.collect.HashBiMap;
16 import com.google.common.collect.ImmutableSet;
18 import java.util.Optional;
20 import java.util.concurrent.locks.StampedLock;
21 import java.util.function.Function;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
23 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
26 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
32 * {@link NotificationListenerAdapter} listeners.
34 public final class ListenersBroker {
35 private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
36 private static ListenersBroker listenersBroker;
38 private final StampedLock dataChangeListenersLock = new StampedLock();
39 private final StampedLock notificationListenersLock = new StampedLock();
40 private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
41 private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
43 private ListenersBroker() {
47 * Creation of the singleton listeners broker.
49 * @return Reusable instance of {@link ListenersBroker}.
51 public static synchronized ListenersBroker getInstance() {
52 if (listenersBroker == null) {
53 listenersBroker = new ListenersBroker();
55 return listenersBroker;
59 * Returns set of all data-change-event streams.
61 public Set<String> getDataChangeStreams() {
62 final long stamp = dataChangeListenersLock.readLock();
64 return ImmutableSet.copyOf(dataChangeListeners.keySet());
66 dataChangeListenersLock.unlockRead(stamp);
71 * Returns set of all notification streams.
73 public Set<String> getNotificationStreams() {
74 final long stamp = notificationListenersLock.readLock();
76 return ImmutableSet.copyOf(notificationListeners.keySet());
78 notificationListenersLock.unlockRead(stamp);
83 * Gets {@link ListenerAdapter} specified by stream identification.
85 * @param streamName Stream name.
86 * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
87 * if listener with specified stream name doesn't exist.
89 public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
90 final long stamp = dataChangeListenersLock.readLock();
92 final ListenerAdapter listenerAdapter = dataChangeListeners.get(requireNonNull(streamName));
93 return Optional.ofNullable(listenerAdapter);
95 dataChangeListenersLock.unlockRead(stamp);
100 * Gets {@link NotificationListenerAdapter} specified by stream name.
102 * @param streamName Stream name.
103 * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
104 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
106 public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
107 final long stamp = notificationListenersLock.readLock();
109 final NotificationListenerAdapter listenerAdapter = notificationListeners.get(requireNonNull(streamName));
110 return Optional.ofNullable(listenerAdapter);
112 notificationListenersLock.unlockRead(stamp);
117 * Get listener for stream-name.
119 * @param streamName Stream name.
120 * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
121 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
123 public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
124 if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
125 return getNotificationListenerFor(streamName).map(Function.identity());
126 } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
127 return getDataChangeListenerFor(streamName).map(Function.identity());
129 return Optional.empty();
134 * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
135 * hasn't been created yet.
137 * @param path Path to data in data repository.
138 * @param streamName Stream name.
139 * @param outputType Specific type of output for notifications - XML or JSON.
140 * @return Created or existing data-change listener adapter.
142 public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
143 final NotificationOutputType outputType) {
144 requireNonNull(path);
145 requireNonNull(streamName);
146 requireNonNull(outputType);
148 final long stamp = dataChangeListenersLock.writeLock();
150 return dataChangeListeners.computeIfAbsent(streamName, stream -> new ListenerAdapter(
151 path, stream, outputType));
153 dataChangeListenersLock.unlockWrite(stamp);
158 * Creates new {@link NotificationDefinition} listener using input stream name and schema path
159 * if such listener haven't been created yet.
161 * @param schemaPath Schema path of YANG notification structure.
162 * @param streamName Stream name.
163 * @param outputType Specific type of output for notifications - XML or JSON.
164 * @return Created or existing notification listener adapter.
166 public NotificationListenerAdapter registerNotificationListener(final SchemaPath schemaPath,
167 final String streamName, final NotificationOutputType outputType) {
168 requireNonNull(schemaPath);
169 requireNonNull(streamName);
170 requireNonNull(outputType);
172 final long stamp = notificationListenersLock.writeLock();
174 return notificationListeners.computeIfAbsent(streamName, stream -> new NotificationListenerAdapter(
175 schemaPath, stream, outputType.getName()));
177 notificationListenersLock.unlockWrite(stamp);
182 * Removal and closing of all data-change-event and notification listeners.
184 public synchronized void removeAndCloseAllListeners() {
185 final long stampNotifications = notificationListenersLock.writeLock();
186 final long stampDataChanges = dataChangeListenersLock.writeLock();
188 removeAndCloseAllDataChangeListenersTemplate();
189 removeAndCloseAllNotificationListenersTemplate();
191 dataChangeListenersLock.unlockWrite(stampDataChanges);
192 notificationListenersLock.unlockWrite(stampNotifications);
197 * Closes and removes all data-change listeners.
199 public void removeAndCloseAllDataChangeListeners() {
200 final long stamp = dataChangeListenersLock.writeLock();
202 removeAndCloseAllDataChangeListenersTemplate();
204 dataChangeListenersLock.unlockWrite(stamp);
208 @SuppressWarnings("checkstyle:IllegalCatch")
209 private void removeAndCloseAllDataChangeListenersTemplate() {
210 dataChangeListeners.values()
211 .forEach(listenerAdapter -> {
213 listenerAdapter.close();
214 } catch (final Exception exception) {
215 LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
216 throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
217 listenerAdapter), exception);
220 dataChangeListeners.clear();
224 * Closes and removes all notification listeners.
226 public void removeAndCloseAllNotificationListeners() {
227 final long stamp = notificationListenersLock.writeLock();
229 removeAndCloseAllNotificationListenersTemplate();
231 notificationListenersLock.unlockWrite(stamp);
235 @SuppressWarnings("checkstyle:IllegalCatch")
236 private void removeAndCloseAllNotificationListenersTemplate() {
237 notificationListeners.values()
238 .forEach(listenerAdapter -> {
240 listenerAdapter.close();
241 } catch (final Exception exception) {
242 LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
243 throw new IllegalStateException(String.format("Failed to close notification listener %s.",
244 listenerAdapter), exception);
247 notificationListeners.clear();
251 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
253 * @param listener Listener to be closed and removed.
255 @SuppressWarnings("checkstyle:IllegalCatch")
256 public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
257 final long stamp = dataChangeListenersLock.writeLock();
259 removeAndCloseDataChangeListenerTemplate(listener);
260 } catch (final Exception exception) {
261 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
263 dataChangeListenersLock.unlockWrite(stamp);
268 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
270 * @param listener Listener to be closed and removed.
272 @SuppressWarnings("checkstyle:IllegalCatch")
273 private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
274 final long stamp = dataChangeListenersLock.writeLock();
276 requireNonNull(listener).close();
277 if (dataChangeListeners.inverse().remove(listener) == null) {
278 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
280 } catch (final Exception exception) {
281 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
282 throw new IllegalStateException(String.format(
283 "Data-change listener %s cannot be closed.",
284 listener), exception);
286 dataChangeListenersLock.unlockWrite(stamp);
291 * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
293 * @param listener Listener to be closed and removed.
295 @SuppressWarnings("checkstyle:IllegalCatch")
296 public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
297 final long stamp = notificationListenersLock.writeLock();
299 removeAndCloseNotificationListenerTemplate(listener);
300 } catch (final Exception exception) {
301 LOG.error("Notification listener {} cannot be closed.", listener, exception);
303 notificationListenersLock.unlockWrite(stamp);
307 @SuppressWarnings({"checkstyle:IllegalCatch"})
308 private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) {
310 requireNonNull(listener).close();
311 if (notificationListeners.inverse().remove(listener) == null) {
312 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
314 } catch (final Exception exception) {
315 LOG.error("Notification listener {} cannot be closed.", listener, exception);
316 throw new IllegalStateException(String.format(
317 "Notification listener %s cannot be closed.", listener),
323 * Removal and closing of general listener (data-change or notification listener).
325 * @param listener Listener to be closed and removed from cache.
327 void removeAndCloseListener(final BaseListenerInterface listener) {
328 requireNonNull(listener);
329 if (listener instanceof ListenerAdapter) {
330 removeAndCloseDataChangeListener((ListenerAdapter) listener);
331 } else if (listener instanceof NotificationListenerAdapter) {
332 removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
337 * Creates string representation of stream name from URI. Removes slash from URI in start and end positions.
339 * @param uri URI for creation of stream name.
340 * @return String representation of stream name.
342 public static String createStreamNameFromUri(final String uri) {
343 String result = requireNonNull(uri);
344 if (result.startsWith("/")) {
345 result = result.substring(1);
347 if (result.endsWith("/")) {
348 result = result.substring(0, result.length() - 1);
354 public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
355 final long stamp = dataChangeListenersLock.writeLock();
357 dataChangeListeners.clear();
358 dataChangeListeners.putAll(listenerAdapterCollection);
360 dataChangeListenersLock.unlockWrite(stamp);