2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
17 import java.util.Optional;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.locks.StampedLock;
21 import java.util.function.Function;
22 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
23 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
24 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
25 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
28 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
29 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
35 * {@link NotificationListenerAdapter} listeners.
37 // FIXME: this should be a component
38 public final class ListenersBroker {
39 private static final class Holder {
40 // FIXME: remove this global singleton
41 static final ListenersBroker INSTANCE = new ListenersBroker();
44 private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
46 private final StampedLock dataChangeListenersLock = new StampedLock();
47 private final StampedLock notificationListenersLock = new StampedLock();
48 private final StampedLock deviceNotificationListenersLock = new StampedLock();
49 private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
50 private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
51 private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
54 private ListenersBroker() {
59 * Creation of the singleton listeners broker.
61 * @return Reusable instance of {@link ListenersBroker}.
63 public static ListenersBroker getInstance() {
64 return Holder.INSTANCE;
68 * Returns set of all data-change-event streams.
70 public Set<String> getDataChangeStreams() {
71 final long stamp = dataChangeListenersLock.readLock();
73 return ImmutableSet.copyOf(dataChangeListeners.keySet());
75 dataChangeListenersLock.unlockRead(stamp);
80 * Returns set of all notification streams.
82 public Set<String> getNotificationStreams() {
83 final long stamp = notificationListenersLock.readLock();
85 return ImmutableSet.copyOf(notificationListeners.keySet());
87 notificationListenersLock.unlockRead(stamp);
92 * Gets {@link ListenerAdapter} specified by stream identification.
94 * @param streamName Stream name.
95 * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
96 * if listener with specified stream name doesn't exist.
98 public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
99 final long stamp = dataChangeListenersLock.readLock();
101 return Optional.ofNullable(dataChangeListeners.get(requireNonNull(streamName)));
103 dataChangeListenersLock.unlockRead(stamp);
108 * Gets {@link NotificationListenerAdapter} specified by stream name.
110 * @param streamName Stream name.
111 * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
112 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
114 public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
115 final long stamp = notificationListenersLock.readLock();
117 return Optional.ofNullable(notificationListeners.get(requireNonNull(streamName)));
119 notificationListenersLock.unlockRead(stamp);
124 * Get listener for device path.
127 * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
128 * or {@link Optional#empty()} if listener with specified path doesn't exist.
130 public Optional<BaseListenerInterface> getDeviceNotificationListenerFor(final String path) {
131 final long stamp = deviceNotificationListenersLock.readLock();
133 return Optional.ofNullable(deviceNotificationListeners.get(requireNonNull(path)));
135 deviceNotificationListenersLock.unlockRead(stamp);
140 * Get listener for stream-name.
142 * @param streamName Stream name.
143 * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
144 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
146 public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
147 if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
148 return getNotificationListenerFor(streamName).map(Function.identity());
149 } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
150 return getDataChangeListenerFor(streamName).map(Function.identity());
152 return Optional.empty();
157 * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
158 * hasn't been created yet.
160 * @param path Path to data in data repository.
161 * @param streamName Stream name.
162 * @param outputType Specific type of output for notifications - XML or JSON.
163 * @return Created or existing data-change listener adapter.
165 public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
166 final NotificationOutputType outputType) {
167 requireNonNull(path);
168 requireNonNull(streamName);
169 requireNonNull(outputType);
171 final long stamp = dataChangeListenersLock.writeLock();
173 return dataChangeListeners.computeIfAbsent(streamName,
174 stream -> new ListenerAdapter(path, stream, outputType));
176 dataChangeListenersLock.unlockWrite(stamp);
181 * Creates new {@link NotificationDefinition} listener using input stream name and schema path
182 * if such listener haven't been created yet.
184 * @param schemaPath Schema path of YANG notification structure.
185 * @param streamName Stream name.
186 * @param outputType Specific type of output for notifications - XML or JSON.
187 * @return Created or existing notification listener adapter.
189 public NotificationListenerAdapter registerNotificationListener(final Absolute schemaPath,
190 final String streamName, final NotificationOutputType outputType) {
191 requireNonNull(schemaPath);
192 requireNonNull(streamName);
193 requireNonNull(outputType);
195 final long stamp = notificationListenersLock.writeLock();
197 return notificationListeners.computeIfAbsent(streamName,
198 stream -> new NotificationListenerAdapter(schemaPath, stream, outputType));
200 notificationListenersLock.unlockWrite(stamp);
205 * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
206 * if such listener haven't been created yet.
208 * @param streamName Stream name.
209 * @param outputType Specific type of output for notifications - XML or JSON.
210 * @param refSchemaCtx Schema context of node
211 * @param mountPointService Mount point service
212 * @return Created or existing device notification listener adapter.
214 public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
215 final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
216 final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
218 final long stamp = deviceNotificationListenersLock.writeLock();
220 return deviceNotificationListeners.computeIfAbsent(streamName,
221 stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
222 mountPointService, path));
224 deviceNotificationListenersLock.unlockWrite(stamp);
229 * Removal and closing of all data-change-event and notification listeners.
231 public synchronized void removeAndCloseAllListeners() {
232 final long stampNotifications = notificationListenersLock.writeLock();
233 final long stampDataChanges = dataChangeListenersLock.writeLock();
235 removeAndCloseAllDataChangeListenersTemplate();
236 removeAndCloseAllNotificationListenersTemplate();
238 dataChangeListenersLock.unlockWrite(stampDataChanges);
239 notificationListenersLock.unlockWrite(stampNotifications);
244 * Closes and removes all data-change listeners.
246 public void removeAndCloseAllDataChangeListeners() {
247 final long stamp = dataChangeListenersLock.writeLock();
249 removeAndCloseAllDataChangeListenersTemplate();
251 dataChangeListenersLock.unlockWrite(stamp);
255 @SuppressWarnings("checkstyle:IllegalCatch")
256 private void removeAndCloseAllDataChangeListenersTemplate() {
257 dataChangeListeners.values()
258 .forEach(listenerAdapter -> {
260 listenerAdapter.close();
261 } catch (final Exception exception) {
262 LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
263 throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
264 listenerAdapter), exception);
267 dataChangeListeners.clear();
271 * Closes and removes all notification listeners.
273 public void removeAndCloseAllNotificationListeners() {
274 final long stamp = notificationListenersLock.writeLock();
276 removeAndCloseAllNotificationListenersTemplate();
278 notificationListenersLock.unlockWrite(stamp);
282 @SuppressWarnings("checkstyle:IllegalCatch")
283 private void removeAndCloseAllNotificationListenersTemplate() {
284 notificationListeners.values()
285 .forEach(listenerAdapter -> {
287 listenerAdapter.close();
288 } catch (final Exception exception) {
289 LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
290 throw new IllegalStateException(String.format("Failed to close notification listener %s.",
291 listenerAdapter), exception);
294 notificationListeners.clear();
298 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
300 * @param listener Listener to be closed and removed.
302 @SuppressWarnings("checkstyle:IllegalCatch")
303 public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
304 final long stamp = dataChangeListenersLock.writeLock();
306 removeAndCloseDataChangeListenerTemplate(listener);
307 } catch (final Exception exception) {
308 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
310 dataChangeListenersLock.unlockWrite(stamp);
315 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
317 * @param listener Listener to be closed and removed.
319 private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
321 requireNonNull(listener).close();
322 if (dataChangeListeners.inverse().remove(listener) == null) {
323 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
325 } catch (final InterruptedException | ExecutionException exception) {
326 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
327 throw new IllegalStateException(String.format(
328 "Data-change listener %s cannot be closed.",
329 listener), exception);
334 * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
336 * @param listener Listener to be closed and removed.
338 @SuppressWarnings("checkstyle:IllegalCatch")
339 public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
340 final long stamp = notificationListenersLock.writeLock();
342 removeAndCloseNotificationListenerTemplate(listener);
343 } catch (final Exception exception) {
344 LOG.error("Notification listener {} cannot be closed.", listener, exception);
346 notificationListenersLock.unlockWrite(stamp);
351 * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
352 * specified in parameter.
354 * @param listener Listener to be closed and removed.
356 @SuppressWarnings("checkstyle:IllegalCatch")
357 public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
358 final long stamp = deviceNotificationListenersLock.writeLock();
360 requireNonNull(listener);
361 if (deviceNotificationListeners.inverse().remove(listener) == null) {
362 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
364 } catch (final Exception exception) {
365 LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
367 deviceNotificationListenersLock.unlockWrite(stamp);
371 private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
373 requireNonNull(listener).close();
374 if (notificationListeners.inverse().remove(listener) == null) {
375 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
377 } catch (final InterruptedException | ExecutionException exception) {
378 LOG.error("Notification listener {} cannot be closed.", listener, exception);
379 throw new IllegalStateException(String.format(
380 "Notification listener %s cannot be closed.", listener),
386 * Removal and closing of general listener (data-change or notification listener).
388 * @param listener Listener to be closed and removed from cache.
390 void removeAndCloseListener(final BaseListenerInterface listener) {
391 requireNonNull(listener);
392 if (listener instanceof ListenerAdapter) {
393 removeAndCloseDataChangeListener((ListenerAdapter) listener);
394 } else if (listener instanceof NotificationListenerAdapter) {
395 removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
400 * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
401 * and optionally {@link RestconfConstants#BASE_URI_PATTERN} prefix.
403 * @param uri URI for creation of stream name.
404 * @return String representation of stream name.
406 public static String createStreamNameFromUri(final String uri) {
407 String result = requireNonNull(uri);
409 if (result.startsWith(RestconfConstants.BASE_URI_PATTERN)) {
410 result = result.substring(RestconfConstants.BASE_URI_PATTERN.length());
411 } else if (result.startsWith("/")) {
412 result = result.substring(1);
417 if (result.endsWith("/")) {
418 result = result.substring(0, result.length() - 1);
424 public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
425 final long stamp = dataChangeListenersLock.writeLock();
427 dataChangeListeners.clear();
428 dataChangeListeners.putAll(listenerAdapterCollection);
430 dataChangeListenersLock.unlockWrite(stamp);