2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
17 import java.util.Optional;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.locks.StampedLock;
21 import java.util.function.Function;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
23 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
24 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
27 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
33 * {@link NotificationListenerAdapter} listeners.
35 // FIXME: this should be a component
36 public final class ListenersBroker {
37 private static final class Holder {
38 // FIXME: remove this global singleton
39 static final ListenersBroker INSTANCE = new ListenersBroker();
42 private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
44 private final StampedLock dataChangeListenersLock = new StampedLock();
45 private final StampedLock notificationListenersLock = new StampedLock();
46 private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
47 private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
49 private ListenersBroker() {
54 * Creation of the singleton listeners broker.
56 * @return Reusable instance of {@link ListenersBroker}.
58 public static ListenersBroker getInstance() {
59 return Holder.INSTANCE;
63 * Returns set of all data-change-event streams.
65 public Set<String> getDataChangeStreams() {
66 final long stamp = dataChangeListenersLock.readLock();
68 return ImmutableSet.copyOf(dataChangeListeners.keySet());
70 dataChangeListenersLock.unlockRead(stamp);
75 * Returns set of all notification streams.
77 public Set<String> getNotificationStreams() {
78 final long stamp = notificationListenersLock.readLock();
80 return ImmutableSet.copyOf(notificationListeners.keySet());
82 notificationListenersLock.unlockRead(stamp);
87 * Gets {@link ListenerAdapter} specified by stream identification.
89 * @param streamName Stream name.
90 * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
91 * if listener with specified stream name doesn't exist.
93 public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
94 final long stamp = dataChangeListenersLock.readLock();
96 return Optional.ofNullable(dataChangeListeners.get(requireNonNull(streamName)));
98 dataChangeListenersLock.unlockRead(stamp);
103 * Gets {@link NotificationListenerAdapter} specified by stream name.
105 * @param streamName Stream name.
106 * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
107 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
109 public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
110 final long stamp = notificationListenersLock.readLock();
112 return Optional.ofNullable(notificationListeners.get(requireNonNull(streamName)));
114 notificationListenersLock.unlockRead(stamp);
119 * Get listener for stream-name.
121 * @param streamName Stream name.
122 * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
123 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
125 public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
126 if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
127 return getNotificationListenerFor(streamName).map(Function.identity());
128 } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
129 return getDataChangeListenerFor(streamName).map(Function.identity());
131 return Optional.empty();
136 * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
137 * hasn't been created yet.
139 * @param path Path to data in data repository.
140 * @param streamName Stream name.
141 * @param outputType Specific type of output for notifications - XML or JSON.
142 * @return Created or existing data-change listener adapter.
144 public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
145 final NotificationOutputType outputType) {
146 requireNonNull(path);
147 requireNonNull(streamName);
148 requireNonNull(outputType);
150 final long stamp = dataChangeListenersLock.writeLock();
152 return dataChangeListeners.computeIfAbsent(streamName,
153 stream -> new ListenerAdapter(path, stream, outputType));
155 dataChangeListenersLock.unlockWrite(stamp);
160 * Creates new {@link NotificationDefinition} listener using input stream name and schema path
161 * if such listener haven't been created yet.
163 * @param schemaPath Schema path of YANG notification structure.
164 * @param streamName Stream name.
165 * @param outputType Specific type of output for notifications - XML or JSON.
166 * @return Created or existing notification listener adapter.
168 public NotificationListenerAdapter registerNotificationListener(final Absolute schemaPath,
169 final String streamName, final NotificationOutputType outputType) {
170 requireNonNull(schemaPath);
171 requireNonNull(streamName);
172 requireNonNull(outputType);
174 final long stamp = notificationListenersLock.writeLock();
176 return notificationListeners.computeIfAbsent(streamName,
177 stream -> new NotificationListenerAdapter(schemaPath, stream, outputType));
179 notificationListenersLock.unlockWrite(stamp);
184 * Removal and closing of all data-change-event and notification listeners.
186 public synchronized void removeAndCloseAllListeners() {
187 final long stampNotifications = notificationListenersLock.writeLock();
188 final long stampDataChanges = dataChangeListenersLock.writeLock();
190 removeAndCloseAllDataChangeListenersTemplate();
191 removeAndCloseAllNotificationListenersTemplate();
193 dataChangeListenersLock.unlockWrite(stampDataChanges);
194 notificationListenersLock.unlockWrite(stampNotifications);
199 * Closes and removes all data-change listeners.
201 public void removeAndCloseAllDataChangeListeners() {
202 final long stamp = dataChangeListenersLock.writeLock();
204 removeAndCloseAllDataChangeListenersTemplate();
206 dataChangeListenersLock.unlockWrite(stamp);
210 @SuppressWarnings("checkstyle:IllegalCatch")
211 private void removeAndCloseAllDataChangeListenersTemplate() {
212 dataChangeListeners.values()
213 .forEach(listenerAdapter -> {
215 listenerAdapter.close();
216 } catch (final Exception exception) {
217 LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
218 throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
219 listenerAdapter), exception);
222 dataChangeListeners.clear();
226 * Closes and removes all notification listeners.
228 public void removeAndCloseAllNotificationListeners() {
229 final long stamp = notificationListenersLock.writeLock();
231 removeAndCloseAllNotificationListenersTemplate();
233 notificationListenersLock.unlockWrite(stamp);
237 @SuppressWarnings("checkstyle:IllegalCatch")
238 private void removeAndCloseAllNotificationListenersTemplate() {
239 notificationListeners.values()
240 .forEach(listenerAdapter -> {
242 listenerAdapter.close();
243 } catch (final Exception exception) {
244 LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
245 throw new IllegalStateException(String.format("Failed to close notification listener %s.",
246 listenerAdapter), exception);
249 notificationListeners.clear();
253 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
255 * @param listener Listener to be closed and removed.
257 @SuppressWarnings("checkstyle:IllegalCatch")
258 public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
259 final long stamp = dataChangeListenersLock.writeLock();
261 removeAndCloseDataChangeListenerTemplate(listener);
262 } catch (final Exception exception) {
263 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
265 dataChangeListenersLock.unlockWrite(stamp);
270 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
272 * @param listener Listener to be closed and removed.
274 private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
276 requireNonNull(listener).close();
277 if (dataChangeListeners.inverse().remove(listener) == null) {
278 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
280 } catch (final InterruptedException | ExecutionException exception) {
281 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
282 throw new IllegalStateException(String.format(
283 "Data-change listener %s cannot be closed.",
284 listener), exception);
289 * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
291 * @param listener Listener to be closed and removed.
293 @SuppressWarnings("checkstyle:IllegalCatch")
294 public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
295 final long stamp = notificationListenersLock.writeLock();
297 removeAndCloseNotificationListenerTemplate(listener);
298 } catch (final Exception exception) {
299 LOG.error("Notification listener {} cannot be closed.", listener, exception);
301 notificationListenersLock.unlockWrite(stamp);
305 private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
307 requireNonNull(listener).close();
308 if (notificationListeners.inverse().remove(listener) == null) {
309 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
311 } catch (final InterruptedException | ExecutionException exception) {
312 LOG.error("Notification listener {} cannot be closed.", listener, exception);
313 throw new IllegalStateException(String.format(
314 "Notification listener %s cannot be closed.", listener),
320 * Removal and closing of general listener (data-change or notification listener).
322 * @param listener Listener to be closed and removed from cache.
324 void removeAndCloseListener(final BaseListenerInterface listener) {
325 requireNonNull(listener);
326 if (listener instanceof ListenerAdapter) {
327 removeAndCloseDataChangeListener((ListenerAdapter) listener);
328 } else if (listener instanceof NotificationListenerAdapter) {
329 removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
334 * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
335 * and optionally {@link RestconfConstants#BASE_URI_PATTERN} prefix.
337 * @param uri URI for creation of stream name.
338 * @return String representation of stream name.
340 public static String createStreamNameFromUri(final String uri) {
341 String result = requireNonNull(uri);
343 if (result.startsWith(RestconfConstants.BASE_URI_PATTERN)) {
344 result = result.substring(RestconfConstants.BASE_URI_PATTERN.length());
345 } else if (result.startsWith("/")) {
346 result = result.substring(1);
351 if (result.endsWith("/")) {
352 result = result.substring(0, result.length() - 1);
358 public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
359 final long stamp = dataChangeListenersLock.writeLock();
361 dataChangeListeners.clear();
362 dataChangeListeners.putAll(listenerAdapterCollection);
364 dataChangeListenersLock.unlockWrite(stamp);