2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.BiMap;
13 import com.google.common.collect.HashBiMap;
14 import java.util.Optional;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.locks.StampedLock;
17 import javax.inject.Inject;
18 import javax.inject.Singleton;
19 import org.eclipse.jdt.annotation.Nullable;
20 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
21 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
23 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
26 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
27 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
28 import org.osgi.service.component.annotations.Activate;
29 import org.osgi.service.component.annotations.Component;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
35 * {@link NotificationListenerAdapter} listeners.
37 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
38 // names. We essentially need a component which deals with allocation of stream names and their lifecycle and
39 // the contents of /restconf-state/streams.
41 @Component(service = ListenersBroker.class, immediate = true)
42 public final class ListenersBroker {
43 private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
45 private final StampedLock dataChangeListenersLock = new StampedLock();
46 private final StampedLock notificationListenersLock = new StampedLock();
47 private final StampedLock deviceNotificationListenersLock = new StampedLock();
48 private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
49 private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
50 private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
54 public ListenersBroker() {
59 * Gets {@link ListenerAdapter} specified by stream identification.
61 * @param streamName Stream name.
62 * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
64 * @throws NullPointerException in {@code streamName} is {@code null}
66 public @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
67 requireNonNull(streamName);
69 final long stamp = dataChangeListenersLock.readLock();
71 return dataChangeListeners.get(streamName);
73 dataChangeListenersLock.unlockRead(stamp);
78 * Gets {@link NotificationListenerAdapter} specified by stream name.
80 * @param streamName Stream name.
81 * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
82 * stream name does not exist.
83 * @throws NullPointerException in {@code streamName} is {@code null}
85 public @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
86 requireNonNull(streamName);
88 final long stamp = notificationListenersLock.readLock();
90 return notificationListeners.get(streamName);
92 notificationListenersLock.unlockRead(stamp);
97 * Get listener for device path.
100 * @return {@link BaseListenerInterface} specified by stream name or {@code null} if listener with specified
101 * stream name does not exist.
102 * @throws NullPointerException in {@code path} is {@code null}
104 public @Nullable BaseListenerInterface deviceNotificationListenerFor(final String path) {
105 requireNonNull(path);
107 final long stamp = deviceNotificationListenersLock.readLock();
109 return deviceNotificationListeners.get(path);
111 deviceNotificationListenersLock.unlockRead(stamp);
116 * Get listener for stream-name.
118 * @param streamName Stream name.
119 * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
120 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
122 public @Nullable BaseListenerInterface listenerFor(final String streamName) {
123 if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
124 return notificationListenerFor(streamName);
125 } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
126 return dataChangeListenerFor(streamName);
133 * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
134 * hasn't been created yet.
136 * @param path Path to data in data repository.
137 * @param streamName Stream name.
138 * @param outputType Specific type of output for notifications - XML or JSON.
139 * @return Created or existing data-change listener adapter.
141 public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
142 final NotificationOutputType outputType) {
143 requireNonNull(path);
144 requireNonNull(streamName);
145 requireNonNull(outputType);
147 final long stamp = dataChangeListenersLock.writeLock();
149 return dataChangeListeners.computeIfAbsent(streamName,
150 stream -> new ListenerAdapter(path, stream, outputType, this));
152 dataChangeListenersLock.unlockWrite(stamp);
157 * Creates new {@link NotificationDefinition} listener using input stream name and schema path
158 * if such listener haven't been created yet.
160 * @param schemaPath Schema path of YANG notification structure.
161 * @param streamName Stream name.
162 * @param outputType Specific type of output for notifications - XML or JSON.
163 * @return Created or existing notification listener adapter.
165 public NotificationListenerAdapter registerNotificationListener(final Absolute schemaPath,
166 final String streamName, final NotificationOutputType outputType) {
167 requireNonNull(schemaPath);
168 requireNonNull(streamName);
169 requireNonNull(outputType);
171 final long stamp = notificationListenersLock.writeLock();
173 return notificationListeners.computeIfAbsent(streamName,
174 stream -> new NotificationListenerAdapter(schemaPath, stream, outputType, this));
176 notificationListenersLock.unlockWrite(stamp);
181 * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
182 * if such listener haven't been created yet.
184 * @param streamName Stream name.
185 * @param outputType Specific type of output for notifications - XML or JSON.
186 * @param refSchemaCtx Schema context of node
187 * @param mountPointService Mount point service
188 * @return Created or existing device notification listener adapter.
190 public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
191 final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
192 final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
194 final long stamp = deviceNotificationListenersLock.writeLock();
196 return deviceNotificationListeners.computeIfAbsent(streamName,
197 stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
198 mountPointService, path, this));
200 deviceNotificationListenersLock.unlockWrite(stamp);
205 * Removal and closing of all data-change-event and notification listeners.
207 public synchronized void removeAndCloseAllListeners() {
208 final long stampNotifications = notificationListenersLock.writeLock();
209 final long stampDataChanges = dataChangeListenersLock.writeLock();
211 removeAndCloseAllDataChangeListenersTemplate();
212 removeAndCloseAllNotificationListenersTemplate();
214 dataChangeListenersLock.unlockWrite(stampDataChanges);
215 notificationListenersLock.unlockWrite(stampNotifications);
220 * Closes and removes all data-change listeners.
222 public void removeAndCloseAllDataChangeListeners() {
223 final long stamp = dataChangeListenersLock.writeLock();
225 removeAndCloseAllDataChangeListenersTemplate();
227 dataChangeListenersLock.unlockWrite(stamp);
231 @SuppressWarnings("checkstyle:IllegalCatch")
232 private void removeAndCloseAllDataChangeListenersTemplate() {
233 dataChangeListeners.values()
234 .forEach(listenerAdapter -> {
236 listenerAdapter.close();
237 } catch (final Exception exception) {
238 LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
239 throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
240 listenerAdapter), exception);
243 dataChangeListeners.clear();
247 * Closes and removes all notification listeners.
249 public void removeAndCloseAllNotificationListeners() {
250 final long stamp = notificationListenersLock.writeLock();
252 removeAndCloseAllNotificationListenersTemplate();
254 notificationListenersLock.unlockWrite(stamp);
258 @SuppressWarnings("checkstyle:IllegalCatch")
259 private void removeAndCloseAllNotificationListenersTemplate() {
260 notificationListeners.values()
261 .forEach(listenerAdapter -> {
263 listenerAdapter.close();
264 } catch (final Exception exception) {
265 LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
266 throw new IllegalStateException(String.format("Failed to close notification listener %s.",
267 listenerAdapter), exception);
270 notificationListeners.clear();
274 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
276 * @param listener Listener to be closed and removed.
278 @SuppressWarnings("checkstyle:IllegalCatch")
279 public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
280 final long stamp = dataChangeListenersLock.writeLock();
282 removeAndCloseDataChangeListenerTemplate(listener);
283 } catch (final Exception exception) {
284 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
286 dataChangeListenersLock.unlockWrite(stamp);
291 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
293 * @param listener Listener to be closed and removed.
295 private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
297 requireNonNull(listener).close();
298 if (dataChangeListeners.inverse().remove(listener) == null) {
299 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
301 } catch (final InterruptedException | ExecutionException exception) {
302 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
303 throw new IllegalStateException(String.format(
304 "Data-change listener %s cannot be closed.",
305 listener), exception);
310 * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
312 * @param listener Listener to be closed and removed.
314 @SuppressWarnings("checkstyle:IllegalCatch")
315 public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
316 final long stamp = notificationListenersLock.writeLock();
318 removeAndCloseNotificationListenerTemplate(listener);
319 } catch (final Exception exception) {
320 LOG.error("Notification listener {} cannot be closed.", listener, exception);
322 notificationListenersLock.unlockWrite(stamp);
327 * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
328 * specified in parameter.
330 * @param listener Listener to be closed and removed.
332 @SuppressWarnings("checkstyle:IllegalCatch")
333 public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
334 final long stamp = deviceNotificationListenersLock.writeLock();
336 requireNonNull(listener);
337 if (deviceNotificationListeners.inverse().remove(listener) == null) {
338 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
340 } catch (final Exception exception) {
341 LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
343 deviceNotificationListenersLock.unlockWrite(stamp);
347 private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
349 requireNonNull(listener).close();
350 if (notificationListeners.inverse().remove(listener) == null) {
351 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
353 } catch (final InterruptedException | ExecutionException exception) {
354 LOG.error("Notification listener {} cannot be closed.", listener, exception);
355 throw new IllegalStateException(String.format(
356 "Notification listener %s cannot be closed.", listener),
362 * Removal and closing of general listener (data-change or notification listener).
364 * @param listener Listener to be closed and removed from cache.
366 void removeAndCloseListener(final BaseListenerInterface listener) {
367 requireNonNull(listener);
368 if (listener instanceof ListenerAdapter) {
369 removeAndCloseDataChangeListener((ListenerAdapter) listener);
370 } else if (listener instanceof NotificationListenerAdapter) {
371 removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
376 * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
377 * and optionally {@link URLConstants#BASE_PATH} prefix.
379 * @param uri URI for creation of stream name.
380 * @return String representation of stream name.
382 public static String createStreamNameFromUri(final String uri) {
383 String result = requireNonNull(uri);
385 if (result.startsWith(URLConstants.BASE_PATH)) {
386 result = result.substring(URLConstants.BASE_PATH.length());
387 } else if (result.startsWith("/")) {
388 result = result.substring(1);
393 if (result.endsWith("/")) {
394 result = result.substring(0, result.length() - 1);