Remove an unused import
[netconf.git] / plugins / netconf-server-mdsal / src / main / java / org / opendaylight / netconf / server / mdsal / notifications / NetconfNotificationManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.server.mdsal.notifications;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.collect.HashMultimap;
15 import com.google.common.collect.HashMultiset;
16 import com.google.common.collect.Maps;
17 import com.google.common.collect.Multimap;
18 import com.google.common.collect.Multiset;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import javax.annotation.PreDestroy;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.checkerframework.checker.lock.qual.GuardedBy;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.binding.dom.codec.spi.BindingDOMCodecFactory;
30 import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
31 import org.opendaylight.netconf.api.messages.NotificationMessage;
32 import org.opendaylight.netconf.server.api.notifications.BaseNotificationPublisherRegistration;
33 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationCollector;
34 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationListener;
35 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationRegistry;
36 import org.opendaylight.netconf.server.api.notifications.NotificationPublisherRegistration;
37 import org.opendaylight.netconf.server.api.notifications.YangLibraryPublisherRegistration;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
41 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
42 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
44 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
45 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionEnd;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionStart;
47 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.YangLibraryChange;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.YangLibraryUpdate;
49 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
50 import org.opendaylight.yangtools.concepts.AbstractRegistration;
51 import org.opendaylight.yangtools.concepts.Registration;
52 import org.opendaylight.yangtools.yang.binding.Notification;
53 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
54 import org.opendaylight.yangtools.yang.parser.api.YangParserException;
55 import org.opendaylight.yangtools.yang.parser.api.YangParserFactory;
56 import org.osgi.service.component.annotations.Activate;
57 import org.osgi.service.component.annotations.Component;
58 import org.osgi.service.component.annotations.Deactivate;
59 import org.osgi.service.component.annotations.Reference;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  *  A thread-safe implementation NetconfNotificationRegistry.
65  */
66 @Singleton
67 @Component(service = { NetconfNotificationCollector.class, NetconfNotificationRegistry.class }, immediate = true,
68            property = "type=netconf-notification-manager")
69 public final class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry,
70         NetconfNotificationListener, AutoCloseable {
71     public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF");
72     public static final Stream BASE_NETCONF_STREAM = new StreamBuilder()
73                 .setName(BASE_STREAM_NAME)
74                 .withKey(new StreamKey(BASE_STREAM_NAME))
75                 .setReplaySupport(false)
76                 .setDescription("Default Event Stream")
77                 .build();
78
79     private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class);
80
81     // TODO excessive synchronization provides thread safety but is most likely not optimal
82     // (combination of concurrent collections might improve performance)
83     // And also calling callbacks from a synchronized block is dangerous
84     // since the listeners/publishers can block the whole notification processing
85
86     @GuardedBy("this")
87     private final Multimap<StreamNameType, ListenerReg> notificationListeners = HashMultimap.create();
88
89     @GuardedBy("this")
90     private final Set<StreamListenerReg> streamListeners = new HashSet<>();
91
92     @GuardedBy("this")
93     private final Map<StreamNameType, Stream> streamMetadata = new HashMap<>();
94
95     @GuardedBy("this")
96     private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
97
98     @GuardedBy("this")
99     private final Set<PublisherReg> notificationPublishers = new HashSet<>();
100     private final NotificationsTransformUtil transformUtil;
101
102     @Inject
103     @Activate
104     public NetconfNotificationManager(@Reference final YangParserFactory parserFactory,
105             @Reference final BindingRuntimeGenerator generator, @Reference final BindingDOMCodecFactory codecFactory)
106                 throws YangParserException {
107         transformUtil = new NotificationsTransformUtil(parserFactory, generator, codecFactory);
108     }
109
110     @PreDestroy
111     @Deactivate
112     @Override
113     public synchronized void close() {
114         // Unregister all listeners
115         // Use new list to avoid ConcurrentModificationException when the registration removes itself
116         List.copyOf(notificationListeners.values()).forEach(ListenerReg::close);
117         notificationListeners.clear();
118
119         // Unregister all publishers
120         // Use new list to avoid ConcurrentModificationException when the registration removes itself
121         List.copyOf(notificationPublishers).forEach(PublisherReg::close);
122         notificationPublishers.clear();
123
124         // Clear stream Listeners
125         streamListeners.clear();
126     }
127
128     @Override
129     public synchronized void onNotification(final StreamNameType stream, final NotificationMessage notification) {
130         LOG.debug("Notification of type {} detected", stream);
131         if (LOG.isTraceEnabled()) {
132             LOG.debug("Notification of type {} detected: {}", stream, notification);
133         }
134
135         for (var listenerReg : notificationListeners.get(stream)) {
136             listenerReg.getInstance().onNotification(stream, notification);
137         }
138     }
139
140     @Override
141     public synchronized Registration registerNotificationListener(final StreamNameType stream,
142             final NetconfNotificationListener listener) {
143         final var reg = new ListenerReg(listener, stream);
144         LOG.trace("Notification listener registered for stream: {}", stream);
145         notificationListeners.put(stream, reg);
146         return reg;
147     }
148
149     @Override
150     public synchronized Streams getNotificationPublishers() {
151         return new StreamsBuilder().setStream(Maps.uniqueIndex(streamMetadata.values(), Stream::key)).build();
152     }
153
154     @Override
155     public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) {
156         return availableStreams.contains(streamNameType);
157     }
158
159     @Override
160     public synchronized Registration registerStreamListener(final NetconfNotificationStreamListener listener) {
161         final var reg = new StreamListenerReg(listener);
162         streamListeners.add(reg);
163
164         // Notify about all already available
165         for (var availableStream : streamMetadata.values()) {
166             listener.onStreamRegistered(availableStream);
167         }
168
169         return reg;
170     }
171
172     @Override
173     public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
174         final var streamName = requireNonNull(stream).getName();
175         final var reg = new PublisherReg(this, streamName);
176
177         LOG.debug("Notification publisher registered for stream: {}", streamName);
178         if (LOG.isTraceEnabled()) {
179             LOG.trace("Notification publisher registered for stream: {}", stream);
180         }
181
182         final var prev = streamMetadata.putIfAbsent(streamName, stream);
183         if (prev != null) {
184             LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName, prev);
185         }
186
187         availableStreams.add(streamName);
188
189         notificationPublishers.add(reg);
190
191         notifyStreamAdded(stream);
192         return reg;
193     }
194
195     private synchronized void unregisterNotificationPublisher(final StreamNameType streamName, final PublisherReg reg) {
196         availableStreams.remove(streamName);
197         notificationPublishers.remove(reg);
198
199         LOG.debug("Notification publisher unregistered for stream: {}", streamName);
200
201         // Notify stream listeners if all publishers are gone and also clear metadata for stream
202         if (!isStreamAvailable(streamName)) {
203             LOG.debug("Notification stream: {} became unavailable", streamName);
204             streamMetadata.remove(streamName);
205             notifyStreamRemoved(streamName);
206         }
207     }
208
209     private synchronized void notifyStreamAdded(final Stream stream) {
210         for (var streamListener : streamListeners) {
211             streamListener.getInstance().onStreamRegistered(stream);
212         }
213     }
214
215     private synchronized void notifyStreamRemoved(final StreamNameType stream) {
216         for (var streamListener : streamListeners) {
217             streamListener.getInstance().onStreamUnregistered(stream);
218         }
219     }
220
221     @Override
222     public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() {
223         final NotificationPublisherRegistration notificationPublisherRegistration =
224                 registerNotificationPublisher(BASE_NETCONF_STREAM);
225         return new BaseNotificationPublisherReg(transformUtil, notificationPublisherRegistration);
226     }
227
228     @Override
229     public YangLibraryPublisherRegistration registerYangLibraryPublisher() {
230         final NotificationPublisherRegistration notificationPublisherRegistration =
231                 registerNotificationPublisher(BASE_NETCONF_STREAM);
232         return new YangLibraryPublisherReg(transformUtil, notificationPublisherRegistration);
233     }
234
235     private static final class PublisherReg extends AbstractRegistration implements NotificationPublisherRegistration {
236         private final StreamNameType registeredStream;
237
238         private NetconfNotificationManager manager;
239
240         PublisherReg(final NetconfNotificationManager manager, final StreamNameType registeredStream) {
241             this.manager = requireNonNull(manager);
242             this.registeredStream = requireNonNull(registeredStream);
243         }
244
245         @Override
246         public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
247             checkArgument(stream.equals(registeredStream),
248                 "Registered on %s, cannot publish to %s", registeredStream, stream);
249             checkState(notClosed(), "Already closed");
250             manager.onNotification(stream, notification);
251         }
252
253         @Override
254         protected void removeRegistration() {
255             manager.unregisterNotificationPublisher(registeredStream, this);
256             manager = null;
257         }
258     }
259
260     private abstract static class AbstractTransformedRegistration implements Registration {
261         private final NotificationPublisherRegistration delegate;
262         private final NotificationsTransformUtil transformUtil;
263
264         AbstractTransformedRegistration(final NotificationsTransformUtil transformUtil,
265                 final NotificationPublisherRegistration delegate) {
266             this.transformUtil = requireNonNull(transformUtil);
267             this.delegate = requireNonNull(delegate);
268         }
269
270         @Override
271         public final void close() {
272             delegate.close();
273         }
274
275         final void publishNotification(final Notification<?> notification, final Absolute path) {
276             delegate.onNotification(BASE_STREAM_NAME, transformUtil.transform(notification, path));
277         }
278     }
279
280     private static class BaseNotificationPublisherReg extends AbstractTransformedRegistration
281             implements BaseNotificationPublisherRegistration {
282         private static final Absolute CAPABILITY_CHANGE_SCHEMA_PATH = Absolute.of(NetconfCapabilityChange.QNAME);
283         private static final Absolute SESSION_START_PATH = Absolute.of(NetconfSessionStart.QNAME);
284         private static final Absolute SESSION_END_PATH = Absolute.of(NetconfSessionEnd.QNAME);
285
286         BaseNotificationPublisherReg(final NotificationsTransformUtil transformUtil,
287                 final NotificationPublisherRegistration delegate) {
288             super(transformUtil, delegate);
289         }
290
291         @Override
292         public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
293             publishNotification(capabilityChange, CAPABILITY_CHANGE_SCHEMA_PATH);
294         }
295
296         @Override
297         public void onSessionStarted(final NetconfSessionStart start) {
298             publishNotification(start, SESSION_START_PATH);
299         }
300
301         @Override
302         public void onSessionEnded(final NetconfSessionEnd end) {
303             publishNotification(end, SESSION_END_PATH);
304         }
305     }
306
307     private static class YangLibraryPublisherReg extends AbstractTransformedRegistration
308             implements YangLibraryPublisherRegistration {
309         private static final Absolute YANG_LIBRARY_CHANGE_PATH = Absolute.of(YangLibraryChange.QNAME);
310         private static final Absolute YANG_LIBRARY_UPDATE_PATH = Absolute.of(YangLibraryUpdate.QNAME);
311
312         YangLibraryPublisherReg(final NotificationsTransformUtil transformUtil,
313                 final NotificationPublisherRegistration delegate) {
314             super(transformUtil, delegate);
315         }
316
317         @Override
318         @Deprecated
319         public void onYangLibraryChange(final YangLibraryChange yangLibraryChange) {
320             publishNotification(yangLibraryChange, YANG_LIBRARY_CHANGE_PATH);
321         }
322
323         @Override
324         public void onYangLibraryUpdate(final YangLibraryUpdate yangLibraryUpdate) {
325             publishNotification(yangLibraryUpdate, YANG_LIBRARY_UPDATE_PATH);
326         }
327     }
328
329     private final class ListenerReg extends AbstractObjectRegistration<NetconfNotificationListener> {
330         private final StreamNameType stream;
331
332         ListenerReg(final @NonNull NetconfNotificationListener instance, final @NonNull StreamNameType stream) {
333             super(instance);
334             this.stream = requireNonNull(stream);
335         }
336
337         @Override
338         protected void removeRegistration() {
339             synchronized (NetconfNotificationManager.this) {
340                 LOG.trace("Notification listener unregistered for stream: {}", stream);
341                 notificationListeners.remove(stream, this);
342             }
343         }
344     }
345
346     private final class StreamListenerReg extends AbstractObjectRegistration<NetconfNotificationStreamListener> {
347         StreamListenerReg(final @NonNull NetconfNotificationStreamListener instance) {
348             super(instance);
349         }
350
351         @Override
352         protected void removeRegistration() {
353             synchronized (NetconfNotificationManager.this) {
354                 streamListeners.remove(this);
355             }
356         }
357     }
358 }