01ab8411631b661100118054516ac76b9771f854
[netconf.git] / netconf / netconf-notifications-impl / src / main / java / org / opendaylight / netconf / notifications / impl / NetconfNotificationManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.notifications.impl;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.HashMultimap;
12 import com.google.common.collect.HashMultiset;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Multimap;
15 import com.google.common.collect.Multiset;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.Map;
19 import java.util.Set;
20 import javax.annotation.concurrent.GuardedBy;
21 import javax.annotation.concurrent.ThreadSafe;
22 import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
23 import org.opendaylight.netconf.notifications.NetconfNotification;
24 import org.opendaylight.netconf.notifications.NetconfNotificationCollector;
25 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
26 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
27 import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
28 import org.opendaylight.netconf.notifications.NotificationPublisherRegistration;
29 import org.opendaylight.netconf.notifications.NotificationRegistration;
30 import org.opendaylight.netconf.notifications.impl.ops.NotificationsTransformUtil;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
37 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionEnd;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionStart;
40 import org.opendaylight.yangtools.yang.binding.Notification;
41 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 @ThreadSafe
46 public class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry,
47         NetconfNotificationListener, AutoCloseable {
48
49     public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF");
50     public static final Stream BASE_NETCONF_STREAM;
51
52     static {
53         BASE_NETCONF_STREAM = new StreamBuilder()
54                 .setName(BASE_STREAM_NAME)
55                 .withKey(new StreamKey(BASE_STREAM_NAME))
56                 .setReplaySupport(false)
57                 .setDescription("Default Event Stream")
58                 .build();
59     }
60
61     private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class);
62
63     // TODO excessive synchronization provides thread safety but is most likely not optimal
64     // (combination of concurrent collections might improve performance)
65     // And also calling callbacks from a synchronized block is dangerous
66     // since the listeners/publishers can block the whole notification processing
67
68     @GuardedBy("this")
69     private final Multimap<StreamNameType, GenericNotificationListenerReg> notificationListeners =
70             HashMultimap.create();
71
72     @GuardedBy("this")
73     private final Set<NetconfNotificationStreamListener> streamListeners = new HashSet<>();
74
75     @GuardedBy("this")
76     private final Map<StreamNameType, Stream> streamMetadata = new HashMap<>();
77
78     @GuardedBy("this")
79     private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
80
81     @GuardedBy("this")
82     private final Set<GenericNotificationPublisherReg> notificationPublishers = new HashSet<>();
83
84     @Override
85     public synchronized void onNotification(final StreamNameType stream, final NetconfNotification notification) {
86         LOG.debug("Notification of type {} detected", stream);
87         if (LOG.isTraceEnabled()) {
88             LOG.debug("Notification of type {} detected: {}", stream, notification);
89         }
90
91         for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(BASE_STREAM_NAME)) {
92             listenerReg.getListener().onNotification(BASE_STREAM_NAME, notification);
93         }
94     }
95
96     @Override
97     public synchronized NotificationListenerRegistration registerNotificationListener(
98             final StreamNameType stream,
99             final NetconfNotificationListener listener) {
100         Preconditions.checkNotNull(stream);
101         Preconditions.checkNotNull(listener);
102
103         LOG.trace("Notification listener registered for stream: {}", stream);
104
105         final GenericNotificationListenerReg genericNotificationListenerReg =
106                 new GenericNotificationListenerReg(listener) {
107             @Override
108             public void close() {
109                 synchronized (NetconfNotificationManager.this) {
110                     LOG.trace("Notification listener unregistered for stream: {}", stream);
111                     super.close();
112                 }
113             }
114         };
115
116         notificationListeners.put(BASE_STREAM_NAME, genericNotificationListenerReg);
117         return genericNotificationListenerReg;
118     }
119
120     @Override
121     public synchronized Streams getNotificationPublishers() {
122         return new StreamsBuilder().setStream(Lists.newArrayList(streamMetadata.values())).build();
123     }
124
125     @Override
126     public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) {
127         return availableStreams.contains(streamNameType);
128     }
129
130     @Override
131     public synchronized NotificationRegistration registerStreamListener(
132             final NetconfNotificationStreamListener listener) {
133         streamListeners.add(listener);
134
135         // Notify about all already available
136         for (final Stream availableStream : streamMetadata.values()) {
137             listener.onStreamRegistered(availableStream);
138         }
139
140         return () -> {
141             synchronized (NetconfNotificationManager.this) {
142                 streamListeners.remove(listener);
143             }
144         };
145     }
146
147     @Override
148     public synchronized void close() {
149         // Unregister all listeners
150         for (final GenericNotificationListenerReg genericNotificationListenerReg : notificationListeners.values()) {
151             genericNotificationListenerReg.close();
152         }
153         notificationListeners.clear();
154
155         // Unregister all publishers
156         for (final GenericNotificationPublisherReg notificationPublisher : notificationPublishers) {
157             notificationPublisher.close();
158         }
159         notificationPublishers.clear();
160
161         // Clear stream Listeners
162         streamListeners.clear();
163     }
164
165     @Override
166     public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
167         Preconditions.checkNotNull(stream);
168         final StreamNameType streamName = stream.getName();
169
170         LOG.debug("Notification publisher registered for stream: {}", streamName);
171         if (LOG.isTraceEnabled()) {
172             LOG.trace("Notification publisher registered for stream: {}", stream);
173         }
174
175         if (streamMetadata.containsKey(streamName)) {
176             LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName,
177                     streamMetadata.get(streamName));
178         } else {
179             streamMetadata.put(streamName, stream);
180         }
181
182         availableStreams.add(streamName);
183
184         final GenericNotificationPublisherReg genericNotificationPublisherReg =
185                 new GenericNotificationPublisherReg(this, streamName) {
186             @Override
187             public void close() {
188                 synchronized (NetconfNotificationManager.this) {
189                     super.close();
190                 }
191             }
192         };
193
194         notificationPublishers.add(genericNotificationPublisherReg);
195
196         notifyStreamAdded(stream);
197         return genericNotificationPublisherReg;
198     }
199
200     private void unregisterNotificationPublisher(
201             final StreamNameType streamName,
202             final GenericNotificationPublisherReg genericNotificationPublisherReg) {
203         availableStreams.remove(streamName);
204         notificationPublishers.remove(genericNotificationPublisherReg);
205
206         LOG.debug("Notification publisher unregistered for stream: {}", streamName);
207
208         // Notify stream listeners if all publishers are gone and also clear metadata for stream
209         if (!isStreamAvailable(streamName)) {
210             LOG.debug("Notification stream: {} became unavailable", streamName);
211             streamMetadata.remove(streamName);
212             notifyStreamRemoved(streamName);
213         }
214     }
215
216     private synchronized void notifyStreamAdded(final Stream stream) {
217         for (final NetconfNotificationStreamListener streamListener : streamListeners) {
218             streamListener.onStreamRegistered(stream);
219         }
220     }
221
222     private synchronized void notifyStreamRemoved(final StreamNameType stream) {
223         for (final NetconfNotificationStreamListener streamListener : streamListeners) {
224             streamListener.onStreamUnregistered(stream);
225         }
226     }
227
228     @Override
229     public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() {
230         final NotificationPublisherRegistration notificationPublisherRegistration =
231                 registerNotificationPublisher(BASE_NETCONF_STREAM);
232         return new BaseNotificationPublisherReg(notificationPublisherRegistration);
233     }
234
235     private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration {
236         private NetconfNotificationManager baseListener;
237         private final StreamNameType registeredStream;
238
239         GenericNotificationPublisherReg(final NetconfNotificationManager baseListener,
240                                         final StreamNameType registeredStream) {
241             this.baseListener = baseListener;
242             this.registeredStream = registeredStream;
243         }
244
245         @Override
246         public void close() {
247             baseListener.unregisterNotificationPublisher(registeredStream, this);
248             baseListener = null;
249         }
250
251         @Override
252         public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
253             Preconditions.checkState(baseListener != null, "Already closed");
254             Preconditions.checkArgument(stream.equals(registeredStream));
255             baseListener.onNotification(stream, notification);
256         }
257     }
258
259     private static class BaseNotificationPublisherReg implements BaseNotificationPublisherRegistration {
260
261         static final SchemaPath CAPABILITY_CHANGE_SCHEMA_PATH = SchemaPath.create(true, NetconfCapabilityChange.QNAME);
262         static final SchemaPath SESSION_START_PATH = SchemaPath.create(true, NetconfSessionStart.QNAME);
263         static final SchemaPath SESSION_END_PATH = SchemaPath.create(true, NetconfSessionEnd.QNAME);
264
265         private final NotificationPublisherRegistration baseRegistration;
266
267         BaseNotificationPublisherReg(final NotificationPublisherRegistration baseRegistration) {
268             this.baseRegistration = baseRegistration;
269         }
270
271         @Override
272         public void close() {
273             baseRegistration.close();
274         }
275
276         private static NetconfNotification serializeNotification(final Notification notification,
277                 final SchemaPath path) {
278             return NotificationsTransformUtil.transform(notification, path);
279         }
280
281         @Override
282         public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
283             baseRegistration.onNotification(BASE_STREAM_NAME,
284                     serializeNotification(capabilityChange, CAPABILITY_CHANGE_SCHEMA_PATH));
285         }
286
287         @Override
288         public void onSessionStarted(final NetconfSessionStart start) {
289             baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(start, SESSION_START_PATH));
290         }
291
292         @Override
293         public void onSessionEnded(final NetconfSessionEnd end) {
294             baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(end, SESSION_END_PATH));
295         }
296     }
297
298     private class GenericNotificationListenerReg implements NotificationListenerRegistration {
299         private final NetconfNotificationListener listener;
300
301         GenericNotificationListenerReg(final NetconfNotificationListener listener) {
302             this.listener = listener;
303         }
304
305         public NetconfNotificationListener getListener() {
306             return listener;
307         }
308
309         @Override
310         public void close() {
311             notificationListeners.remove(BASE_STREAM_NAME, this);
312         }
313     }
314 }