c8989e59496ae304ccbf721f75c4e3d3c3023c29
[netconf.git] / netconf / netconf-notifications-impl / src / main / java / org / opendaylight / netconf / notifications / impl / NetconfNotificationManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.notifications.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.HashMultimap;
13 import com.google.common.collect.HashMultiset;
14 import com.google.common.collect.Lists;
15 import com.google.common.collect.Maps;
16 import com.google.common.collect.Multimap;
17 import com.google.common.collect.Multiset;
18 import com.google.common.collect.Sets;
19 import java.util.Map;
20 import java.util.Set;
21 import javax.annotation.concurrent.GuardedBy;
22 import javax.annotation.concurrent.ThreadSafe;
23 import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
24 import org.opendaylight.netconf.notifications.NetconfNotification;
25 import org.opendaylight.netconf.notifications.NetconfNotificationCollector;
26 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
27 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
28 import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
29 import org.opendaylight.netconf.notifications.NotificationPublisherRegistration;
30 import org.opendaylight.netconf.notifications.NotificationRegistration;
31 import org.opendaylight.netconf.notifications.impl.ops.NotificationsTransformUtil;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
37 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionEnd;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionStart;
41 import org.opendaylight.yangtools.yang.binding.Notification;
42 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 @ThreadSafe
47 public class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry,
48         NetconfNotificationListener, AutoCloseable {
49
50     public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF");
51     public static final Stream BASE_NETCONF_STREAM;
52
53     static {
54         BASE_NETCONF_STREAM = new StreamBuilder()
55                 .setName(BASE_STREAM_NAME)
56                 .setKey(new StreamKey(BASE_STREAM_NAME))
57                 .setReplaySupport(false)
58                 .setDescription("Default Event Stream")
59                 .build();
60     }
61
62     private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class);
63
64     // TODO excessive synchronization provides thread safety but is most likely not optimal
65     // (combination of concurrent collections might improve performance)
66     // And also calling callbacks from a synchronized block is dangerous
67     // since the listeners/publishers can block the whole notification processing
68
69     @GuardedBy("this")
70     private final Multimap<StreamNameType, GenericNotificationListenerReg> notificationListeners =
71             HashMultimap.create();
72
73     @GuardedBy("this")
74     private final Set<NetconfNotificationStreamListener> streamListeners = Sets.newHashSet();
75
76     @GuardedBy("this")
77     private final Map<StreamNameType, Stream> streamMetadata = Maps.newHashMap();
78
79     @GuardedBy("this")
80     private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
81
82     @GuardedBy("this")
83     private final Set<GenericNotificationPublisherReg> notificationPublishers = Sets.newHashSet();
84
85     @Override
86     public synchronized void onNotification(final StreamNameType stream, final NetconfNotification notification) {
87         LOG.debug("Notification of type {} detected", stream);
88         if (LOG.isTraceEnabled()) {
89             LOG.debug("Notification of type {} detected: {}", stream, notification);
90         }
91
92         for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(BASE_STREAM_NAME)) {
93             listenerReg.getListener().onNotification(BASE_STREAM_NAME, notification);
94         }
95     }
96
97     @Override
98     public synchronized NotificationListenerRegistration registerNotificationListener(
99             final StreamNameType stream,
100             final NetconfNotificationListener listener) {
101         Preconditions.checkNotNull(stream);
102         Preconditions.checkNotNull(listener);
103
104         LOG.trace("Notification listener registered for stream: {}", stream);
105
106         final GenericNotificationListenerReg genericNotificationListenerReg =
107                 new GenericNotificationListenerReg(listener) {
108             @Override
109             public void close() {
110                 synchronized (NetconfNotificationManager.this) {
111                     LOG.trace("Notification listener unregistered for stream: {}", stream);
112                     super.close();
113                 }
114             }
115         };
116
117         notificationListeners.put(BASE_STREAM_NAME, genericNotificationListenerReg);
118         return genericNotificationListenerReg;
119     }
120
121     @Override
122     public synchronized Streams getNotificationPublishers() {
123         return new StreamsBuilder().setStream(Lists.newArrayList(streamMetadata.values())).build();
124     }
125
126     @Override
127     public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) {
128         return availableStreams.contains(streamNameType);
129     }
130
131     @Override
132     public synchronized NotificationRegistration registerStreamListener(
133             final NetconfNotificationStreamListener listener) {
134         streamListeners.add(listener);
135
136         // Notify about all already available
137         for (final Stream availableStream : streamMetadata.values()) {
138             listener.onStreamRegistered(availableStream);
139         }
140
141         return new NotificationRegistration() {
142             @Override
143             public void close() {
144                 synchronized (NetconfNotificationManager.this) {
145                     streamListeners.remove(listener);
146                 }
147             }
148         };
149     }
150
151     @Override
152     public synchronized void close() {
153         // Unregister all listeners
154         for (final GenericNotificationListenerReg genericNotificationListenerReg : notificationListeners.values()) {
155             genericNotificationListenerReg.close();
156         }
157         notificationListeners.clear();
158
159         // Unregister all publishers
160         for (final GenericNotificationPublisherReg notificationPublisher : notificationPublishers) {
161             notificationPublisher.close();
162         }
163         notificationPublishers.clear();
164
165         // Clear stream Listeners
166         streamListeners.clear();
167     }
168
169     @Override
170     public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
171         Preconditions.checkNotNull(stream);
172         final StreamNameType streamName = stream.getName();
173
174         LOG.debug("Notification publisher registered for stream: {}", streamName);
175         if (LOG.isTraceEnabled()) {
176             LOG.trace("Notification publisher registered for stream: {}", stream);
177         }
178
179         if (streamMetadata.containsKey(streamName)) {
180             LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName,
181                     streamMetadata.get(streamName));
182         } else {
183             streamMetadata.put(streamName, stream);
184         }
185
186         availableStreams.add(streamName);
187
188         final GenericNotificationPublisherReg genericNotificationPublisherReg =
189                 new GenericNotificationPublisherReg(this, streamName) {
190             @Override
191             public void close() {
192                 synchronized (NetconfNotificationManager.this) {
193                     super.close();
194                 }
195             }
196         };
197
198         notificationPublishers.add(genericNotificationPublisherReg);
199
200         notifyStreamAdded(stream);
201         return genericNotificationPublisherReg;
202     }
203
204     private void unregisterNotificationPublisher(
205             final StreamNameType streamName,
206             final GenericNotificationPublisherReg genericNotificationPublisherReg) {
207         availableStreams.remove(streamName);
208         notificationPublishers.remove(genericNotificationPublisherReg);
209
210         LOG.debug("Notification publisher unregistered for stream: {}", streamName);
211
212         // Notify stream listeners if all publishers are gone and also clear metadata for stream
213         if (!isStreamAvailable(streamName)) {
214             LOG.debug("Notification stream: {} became unavailable", streamName);
215             streamMetadata.remove(streamName);
216             notifyStreamRemoved(streamName);
217         }
218     }
219
220     private synchronized void notifyStreamAdded(final Stream stream) {
221         for (final NetconfNotificationStreamListener streamListener : streamListeners) {
222             streamListener.onStreamRegistered(stream);
223         }
224     }
225
226     private synchronized void notifyStreamRemoved(final StreamNameType stream) {
227         for (final NetconfNotificationStreamListener streamListener : streamListeners) {
228             streamListener.onStreamUnregistered(stream);
229         }
230     }
231
232     @Override
233     public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() {
234         final NotificationPublisherRegistration notificationPublisherRegistration =
235                 registerNotificationPublisher(BASE_NETCONF_STREAM);
236         return new BaseNotificationPublisherReg(notificationPublisherRegistration);
237     }
238
239     private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration {
240         private NetconfNotificationManager baseListener;
241         private final StreamNameType registeredStream;
242
243         GenericNotificationPublisherReg(final NetconfNotificationManager baseListener,
244                                         final StreamNameType registeredStream) {
245             this.baseListener = baseListener;
246             this.registeredStream = registeredStream;
247         }
248
249         @Override
250         public void close() {
251             baseListener.unregisterNotificationPublisher(registeredStream, this);
252             baseListener = null;
253         }
254
255         @Override
256         public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
257             Preconditions.checkState(baseListener != null, "Already closed");
258             Preconditions.checkArgument(stream.equals(registeredStream));
259             baseListener.onNotification(stream, notification);
260         }
261     }
262
263     private static class BaseNotificationPublisherReg implements BaseNotificationPublisherRegistration {
264
265         static final SchemaPath CAPABILITY_CHANGE_SCHEMA_PATH = SchemaPath.create(true, NetconfCapabilityChange.QNAME);
266         static final SchemaPath SESSION_START_PATH = SchemaPath.create(true, NetconfSessionStart.QNAME);
267         static final SchemaPath SESSION_END_PATH = SchemaPath.create(true, NetconfSessionEnd.QNAME);
268
269         private final NotificationPublisherRegistration baseRegistration;
270
271         BaseNotificationPublisherReg(final NotificationPublisherRegistration baseRegistration) {
272             this.baseRegistration = baseRegistration;
273         }
274
275         @Override
276         public void close() {
277             baseRegistration.close();
278         }
279
280         private static NetconfNotification serializeNotification(final Notification notification, SchemaPath path) {
281             return NotificationsTransformUtil.transform(notification, path);
282         }
283
284         @Override
285         public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
286             baseRegistration.onNotification(BASE_STREAM_NAME,
287                     serializeNotification(capabilityChange, CAPABILITY_CHANGE_SCHEMA_PATH));
288         }
289
290         @Override
291         public void onSessionStarted(NetconfSessionStart start) {
292             baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(start, SESSION_START_PATH));
293         }
294
295         @Override
296         public void onSessionEnded(NetconfSessionEnd end) {
297             baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(end, SESSION_END_PATH));
298         }
299     }
300
301     private class GenericNotificationListenerReg implements NotificationListenerRegistration {
302         private final NetconfNotificationListener listener;
303
304         GenericNotificationListenerReg(final NetconfNotificationListener listener) {
305             this.listener = listener;
306         }
307
308         public NetconfNotificationListener getListener() {
309             return listener;
310         }
311
312         @Override
313         public void close() {
314             notificationListeners.remove(BASE_STREAM_NAME, this);
315         }
316     }
317 }