2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.server.mdsal.notifications;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.collect.HashMultimap;
15 import com.google.common.collect.HashMultiset;
16 import com.google.common.collect.Maps;
17 import com.google.common.collect.Multimap;
18 import com.google.common.collect.Multiset;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.List;
24 import javax.annotation.PreDestroy;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.checkerframework.checker.lock.qual.GuardedBy;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecFactory;
30 import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
31 import org.opendaylight.netconf.api.messages.NotificationMessage;
32 import org.opendaylight.netconf.server.api.notifications.BaseNotificationPublisherRegistration;
33 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationCollector;
34 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationCollector.NetconfNotificationStreamListener;
35 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationListener;
36 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationRegistry;
37 import org.opendaylight.netconf.server.api.notifications.NotificationPublisherRegistration;
38 import org.opendaylight.netconf.server.api.notifications.YangLibraryPublisherRegistration;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
41 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
42 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
44 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
45 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionEnd;
47 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionStart;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.YangLibraryChange;
49 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.YangLibraryUpdate;
50 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
51 import org.opendaylight.yangtools.concepts.AbstractRegistration;
52 import org.opendaylight.yangtools.concepts.Registration;
53 import org.opendaylight.yangtools.yang.binding.Notification;
54 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
55 import org.opendaylight.yangtools.yang.parser.api.YangParserException;
56 import org.opendaylight.yangtools.yang.parser.api.YangParserFactory;
57 import org.osgi.service.component.annotations.Activate;
58 import org.osgi.service.component.annotations.Component;
59 import org.osgi.service.component.annotations.Deactivate;
60 import org.osgi.service.component.annotations.Reference;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
65 * A thread-safe implementation NetconfNotificationRegistry.
68 @Component(service = { NetconfNotificationCollector.class, NetconfNotificationRegistry.class }, immediate = true,
69 property = "type=netconf-notification-manager")
70 public final class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry,
71 NetconfNotificationListener, AutoCloseable {
72 public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF");
73 public static final Stream BASE_NETCONF_STREAM = new StreamBuilder()
74 .setName(BASE_STREAM_NAME)
75 .withKey(new StreamKey(BASE_STREAM_NAME))
76 .setReplaySupport(false)
77 .setDescription("Default Event Stream")
80 private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class);
82 // TODO excessive synchronization provides thread safety but is most likely not optimal
83 // (combination of concurrent collections might improve performance)
84 // And also calling callbacks from a synchronized block is dangerous
85 // since the listeners/publishers can block the whole notification processing
88 private final Multimap<StreamNameType, ListenerReg> notificationListeners = HashMultimap.create();
91 private final Set<StreamListenerReg> streamListeners = new HashSet<>();
94 private final Map<StreamNameType, Stream> streamMetadata = new HashMap<>();
97 private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
100 private final Set<PublisherReg> notificationPublishers = new HashSet<>();
101 private final NotificationsTransformUtil transformUtil;
105 public NetconfNotificationManager(@Reference final YangParserFactory parserFactory,
106 @Reference final BindingRuntimeGenerator generator, @Reference final BindingDOMCodecFactory codecFactory)
107 throws YangParserException {
108 transformUtil = new NotificationsTransformUtil(parserFactory, generator, codecFactory);
114 public synchronized void close() {
115 // Unregister all listeners
116 // Use new list to avoid ConcurrentModificationException when the registration removes itself
117 List.copyOf(notificationListeners.values()).forEach(ListenerReg::close);
118 notificationListeners.clear();
120 // Unregister all publishers
121 // Use new list to avoid ConcurrentModificationException when the registration removes itself
122 List.copyOf(notificationPublishers).forEach(PublisherReg::close);
123 notificationPublishers.clear();
125 // Clear stream Listeners
126 streamListeners.clear();
130 public synchronized void onNotification(final StreamNameType stream, final NotificationMessage notification) {
131 LOG.debug("Notification of type {} detected", stream);
132 if (LOG.isTraceEnabled()) {
133 LOG.debug("Notification of type {} detected: {}", stream, notification);
136 for (var listenerReg : notificationListeners.get(stream)) {
137 listenerReg.getInstance().onNotification(stream, notification);
142 public synchronized Registration registerNotificationListener(final StreamNameType stream,
143 final NetconfNotificationListener listener) {
144 final var reg = new ListenerReg(listener, stream);
145 LOG.trace("Notification listener registered for stream: {}", stream);
146 notificationListeners.put(stream, reg);
151 public synchronized Streams getNotificationPublishers() {
152 return new StreamsBuilder().setStream(Maps.uniqueIndex(streamMetadata.values(), Stream::key)).build();
156 public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) {
157 return availableStreams.contains(streamNameType);
161 public synchronized Registration registerStreamListener(final NetconfNotificationStreamListener listener) {
162 final var reg = new StreamListenerReg(listener);
163 streamListeners.add(reg);
165 // Notify about all already available
166 for (var availableStream : streamMetadata.values()) {
167 listener.onStreamRegistered(availableStream);
174 public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
175 final var streamName = requireNonNull(stream).getName();
176 final var reg = new PublisherReg(this, streamName);
178 LOG.debug("Notification publisher registered for stream: {}", streamName);
179 if (LOG.isTraceEnabled()) {
180 LOG.trace("Notification publisher registered for stream: {}", stream);
183 final var prev = streamMetadata.putIfAbsent(streamName, stream);
185 LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName, prev);
188 availableStreams.add(streamName);
190 notificationPublishers.add(reg);
192 notifyStreamAdded(stream);
196 private synchronized void unregisterNotificationPublisher(final StreamNameType streamName, final PublisherReg reg) {
197 availableStreams.remove(streamName);
198 notificationPublishers.remove(reg);
200 LOG.debug("Notification publisher unregistered for stream: {}", streamName);
202 // Notify stream listeners if all publishers are gone and also clear metadata for stream
203 if (!isStreamAvailable(streamName)) {
204 LOG.debug("Notification stream: {} became unavailable", streamName);
205 streamMetadata.remove(streamName);
206 notifyStreamRemoved(streamName);
210 private synchronized void notifyStreamAdded(final Stream stream) {
211 for (var streamListener : streamListeners) {
212 streamListener.getInstance().onStreamRegistered(stream);
216 private synchronized void notifyStreamRemoved(final StreamNameType stream) {
217 for (var streamListener : streamListeners) {
218 streamListener.getInstance().onStreamUnregistered(stream);
223 public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() {
224 final NotificationPublisherRegistration notificationPublisherRegistration =
225 registerNotificationPublisher(BASE_NETCONF_STREAM);
226 return new BaseNotificationPublisherReg(transformUtil, notificationPublisherRegistration);
230 public YangLibraryPublisherRegistration registerYangLibraryPublisher() {
231 final NotificationPublisherRegistration notificationPublisherRegistration =
232 registerNotificationPublisher(BASE_NETCONF_STREAM);
233 return new YangLibraryPublisherReg(transformUtil, notificationPublisherRegistration);
236 private static final class PublisherReg extends AbstractRegistration implements NotificationPublisherRegistration {
237 private final StreamNameType registeredStream;
239 private NetconfNotificationManager manager;
241 PublisherReg(final NetconfNotificationManager manager, final StreamNameType registeredStream) {
242 this.manager = requireNonNull(manager);
243 this.registeredStream = requireNonNull(registeredStream);
247 public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
248 checkArgument(stream.equals(registeredStream),
249 "Registered on %s, cannot publish to %s", registeredStream, stream);
250 checkState(notClosed(), "Already closed");
251 manager.onNotification(stream, notification);
255 protected void removeRegistration() {
256 manager.unregisterNotificationPublisher(registeredStream, this);
261 private abstract static class AbstractTransformedRegistration implements Registration {
262 private final NotificationPublisherRegistration delegate;
263 private final NotificationsTransformUtil transformUtil;
265 AbstractTransformedRegistration(final NotificationsTransformUtil transformUtil,
266 final NotificationPublisherRegistration delegate) {
267 this.transformUtil = requireNonNull(transformUtil);
268 this.delegate = requireNonNull(delegate);
272 public final void close() {
276 final void publishNotification(final Notification<?> notification, final Absolute path) {
277 delegate.onNotification(BASE_STREAM_NAME, transformUtil.transform(notification, path));
281 private static class BaseNotificationPublisherReg extends AbstractTransformedRegistration
282 implements BaseNotificationPublisherRegistration {
283 private static final Absolute CAPABILITY_CHANGE_SCHEMA_PATH = Absolute.of(NetconfCapabilityChange.QNAME);
284 private static final Absolute SESSION_START_PATH = Absolute.of(NetconfSessionStart.QNAME);
285 private static final Absolute SESSION_END_PATH = Absolute.of(NetconfSessionEnd.QNAME);
287 BaseNotificationPublisherReg(final NotificationsTransformUtil transformUtil,
288 final NotificationPublisherRegistration delegate) {
289 super(transformUtil, delegate);
293 public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
294 publishNotification(capabilityChange, CAPABILITY_CHANGE_SCHEMA_PATH);
298 public void onSessionStarted(final NetconfSessionStart start) {
299 publishNotification(start, SESSION_START_PATH);
303 public void onSessionEnded(final NetconfSessionEnd end) {
304 publishNotification(end, SESSION_END_PATH);
308 private static class YangLibraryPublisherReg extends AbstractTransformedRegistration
309 implements YangLibraryPublisherRegistration {
310 private static final Absolute YANG_LIBRARY_CHANGE_PATH = Absolute.of(YangLibraryChange.QNAME);
311 private static final Absolute YANG_LIBRARY_UPDATE_PATH = Absolute.of(YangLibraryUpdate.QNAME);
313 YangLibraryPublisherReg(final NotificationsTransformUtil transformUtil,
314 final NotificationPublisherRegistration delegate) {
315 super(transformUtil, delegate);
320 public void onYangLibraryChange(final YangLibraryChange yangLibraryChange) {
321 publishNotification(yangLibraryChange, YANG_LIBRARY_CHANGE_PATH);
325 public void onYangLibraryUpdate(final YangLibraryUpdate yangLibraryUpdate) {
326 publishNotification(yangLibraryUpdate, YANG_LIBRARY_UPDATE_PATH);
330 private final class ListenerReg extends AbstractObjectRegistration<NetconfNotificationListener> {
331 private final StreamNameType stream;
333 ListenerReg(final @NonNull NetconfNotificationListener instance, final @NonNull StreamNameType stream) {
335 this.stream = requireNonNull(stream);
339 protected void removeRegistration() {
340 synchronized (NetconfNotificationManager.this) {
341 LOG.trace("Notification listener unregistered for stream: {}", stream);
342 notificationListeners.remove(stream, this);
347 private final class StreamListenerReg extends AbstractObjectRegistration<NetconfNotificationStreamListener> {
348 StreamListenerReg(final @NonNull NetconfNotificationStreamListener instance) {
353 protected void removeRegistration() {
354 synchronized (NetconfNotificationManager.this) {
355 streamListeners.remove(this);