import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Function;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.slf4j.Logger;
private final StampedLock dataChangeListenersLock = new StampedLock();
private final StampedLock notificationListenersLock = new StampedLock();
+ private final StampedLock deviceNotificationListenersLock = new StampedLock();
private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
+ private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
+
private ListenersBroker() {
}
}
+ /**
+ * Get listener for device path.
+ *
+ * @param path name.
+ * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
+ * or {@link Optional#empty()} if listener with specified path doesn't exist.
+ */
+ public Optional<BaseListenerInterface> getDeviceNotificationListenerFor(final String path) {
+ final long stamp = deviceNotificationListenersLock.readLock();
+ try {
+ return Optional.ofNullable(deviceNotificationListeners.get(requireNonNull(path)));
+ } finally {
+ deviceNotificationListenersLock.unlockRead(stamp);
+ }
+ }
+
/**
* Get listener for stream-name.
*
}
}
+ /**
+ * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
+ * if such listener haven't been created yet.
+ *
+ * @param streamName Stream name.
+ * @param outputType Specific type of output for notifications - XML or JSON.
+ * @param refSchemaCtx Schema context of node
+ * @param mountPointService Mount point service
+ * @return Created or existing device notification listener adapter.
+ */
+ public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
+ final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
+ final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
+
+ final long stamp = deviceNotificationListenersLock.writeLock();
+ try {
+ return deviceNotificationListeners.computeIfAbsent(streamName,
+ stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
+ mountPointService, path));
+ } finally {
+ deviceNotificationListenersLock.unlockWrite(stamp);
+ }
+ }
+
/**
* Removal and closing of all data-change-event and notification listeners.
*/
}
}
+ /**
+ * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
+ * specified in parameter.
+ *
+ * @param listener Listener to be closed and removed.
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
+ final long stamp = deviceNotificationListenersLock.writeLock();
+ try {
+ requireNonNull(listener);
+ if (deviceNotificationListeners.inverse().remove(listener) == null) {
+ LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
+ }
+ } catch (final Exception exception) {
+ LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
+ } finally {
+ deviceNotificationListenersLock.unlockWrite(stamp);
+ }
+ }
+
private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
try {
requireNonNull(listener).close();