Fixup Augmentable and Identifiable methods changing
[netconf.git] / netconf / netconf-notifications-impl / src / main / java / org / opendaylight / netconf / notifications / impl / NetconfNotificationManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.notifications.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.HashMultimap;
13 import com.google.common.collect.HashMultiset;
14 import com.google.common.collect.Lists;
15 import com.google.common.collect.Maps;
16 import com.google.common.collect.Multimap;
17 import com.google.common.collect.Multiset;
18 import com.google.common.collect.Sets;
19 import java.util.Map;
20 import java.util.Set;
21 import javax.annotation.concurrent.GuardedBy;
22 import javax.annotation.concurrent.ThreadSafe;
23 import org.opendaylight.netconf.notifications.BaseNotificationPublisherRegistration;
24 import org.opendaylight.netconf.notifications.NetconfNotification;
25 import org.opendaylight.netconf.notifications.NetconfNotificationCollector;
26 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
27 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
28 import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
29 import org.opendaylight.netconf.notifications.NotificationPublisherRegistration;
30 import org.opendaylight.netconf.notifications.NotificationRegistration;
31 import org.opendaylight.netconf.notifications.impl.ops.NotificationsTransformUtil;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
37 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionEnd;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfSessionStart;
41 import org.opendaylight.yangtools.yang.binding.Notification;
42 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 @ThreadSafe
47 public class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry,
48         NetconfNotificationListener, AutoCloseable {
49
50     public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF");
51     public static final Stream BASE_NETCONF_STREAM;
52
53     static {
54         BASE_NETCONF_STREAM = new StreamBuilder()
55                 .setName(BASE_STREAM_NAME)
56                 .withKey(new StreamKey(BASE_STREAM_NAME))
57                 .setReplaySupport(false)
58                 .setDescription("Default Event Stream")
59                 .build();
60     }
61
62     private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class);
63
64     // TODO excessive synchronization provides thread safety but is most likely not optimal
65     // (combination of concurrent collections might improve performance)
66     // And also calling callbacks from a synchronized block is dangerous
67     // since the listeners/publishers can block the whole notification processing
68
69     @GuardedBy("this")
70     private final Multimap<StreamNameType, GenericNotificationListenerReg> notificationListeners =
71             HashMultimap.create();
72
73     @GuardedBy("this")
74     private final Set<NetconfNotificationStreamListener> streamListeners = Sets.newHashSet();
75
76     @GuardedBy("this")
77     private final Map<StreamNameType, Stream> streamMetadata = Maps.newHashMap();
78
79     @GuardedBy("this")
80     private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
81
82     @GuardedBy("this")
83     private final Set<GenericNotificationPublisherReg> notificationPublishers = Sets.newHashSet();
84
85     @Override
86     public synchronized void onNotification(final StreamNameType stream, final NetconfNotification notification) {
87         LOG.debug("Notification of type {} detected", stream);
88         if (LOG.isTraceEnabled()) {
89             LOG.debug("Notification of type {} detected: {}", stream, notification);
90         }
91
92         for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(BASE_STREAM_NAME)) {
93             listenerReg.getListener().onNotification(BASE_STREAM_NAME, notification);
94         }
95     }
96
97     @Override
98     public synchronized NotificationListenerRegistration registerNotificationListener(
99             final StreamNameType stream,
100             final NetconfNotificationListener listener) {
101         Preconditions.checkNotNull(stream);
102         Preconditions.checkNotNull(listener);
103
104         LOG.trace("Notification listener registered for stream: {}", stream);
105
106         final GenericNotificationListenerReg genericNotificationListenerReg =
107                 new GenericNotificationListenerReg(listener) {
108             @Override
109             public void close() {
110                 synchronized (NetconfNotificationManager.this) {
111                     LOG.trace("Notification listener unregistered for stream: {}", stream);
112                     super.close();
113                 }
114             }
115         };
116
117         notificationListeners.put(BASE_STREAM_NAME, genericNotificationListenerReg);
118         return genericNotificationListenerReg;
119     }
120
121     @Override
122     public synchronized Streams getNotificationPublishers() {
123         return new StreamsBuilder().setStream(Lists.newArrayList(streamMetadata.values())).build();
124     }
125
126     @Override
127     public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) {
128         return availableStreams.contains(streamNameType);
129     }
130
131     @Override
132     public synchronized NotificationRegistration registerStreamListener(
133             final NetconfNotificationStreamListener listener) {
134         streamListeners.add(listener);
135
136         // Notify about all already available
137         for (final Stream availableStream : streamMetadata.values()) {
138             listener.onStreamRegistered(availableStream);
139         }
140
141         return () -> {
142             synchronized (NetconfNotificationManager.this) {
143                 streamListeners.remove(listener);
144             }
145         };
146     }
147
148     @Override
149     public synchronized void close() {
150         // Unregister all listeners
151         for (final GenericNotificationListenerReg genericNotificationListenerReg : notificationListeners.values()) {
152             genericNotificationListenerReg.close();
153         }
154         notificationListeners.clear();
155
156         // Unregister all publishers
157         for (final GenericNotificationPublisherReg notificationPublisher : notificationPublishers) {
158             notificationPublisher.close();
159         }
160         notificationPublishers.clear();
161
162         // Clear stream Listeners
163         streamListeners.clear();
164     }
165
166     @Override
167     public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
168         Preconditions.checkNotNull(stream);
169         final StreamNameType streamName = stream.getName();
170
171         LOG.debug("Notification publisher registered for stream: {}", streamName);
172         if (LOG.isTraceEnabled()) {
173             LOG.trace("Notification publisher registered for stream: {}", stream);
174         }
175
176         if (streamMetadata.containsKey(streamName)) {
177             LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName,
178                     streamMetadata.get(streamName));
179         } else {
180             streamMetadata.put(streamName, stream);
181         }
182
183         availableStreams.add(streamName);
184
185         final GenericNotificationPublisherReg genericNotificationPublisherReg =
186                 new GenericNotificationPublisherReg(this, streamName) {
187             @Override
188             public void close() {
189                 synchronized (NetconfNotificationManager.this) {
190                     super.close();
191                 }
192             }
193         };
194
195         notificationPublishers.add(genericNotificationPublisherReg);
196
197         notifyStreamAdded(stream);
198         return genericNotificationPublisherReg;
199     }
200
201     private void unregisterNotificationPublisher(
202             final StreamNameType streamName,
203             final GenericNotificationPublisherReg genericNotificationPublisherReg) {
204         availableStreams.remove(streamName);
205         notificationPublishers.remove(genericNotificationPublisherReg);
206
207         LOG.debug("Notification publisher unregistered for stream: {}", streamName);
208
209         // Notify stream listeners if all publishers are gone and also clear metadata for stream
210         if (!isStreamAvailable(streamName)) {
211             LOG.debug("Notification stream: {} became unavailable", streamName);
212             streamMetadata.remove(streamName);
213             notifyStreamRemoved(streamName);
214         }
215     }
216
217     private synchronized void notifyStreamAdded(final Stream stream) {
218         for (final NetconfNotificationStreamListener streamListener : streamListeners) {
219             streamListener.onStreamRegistered(stream);
220         }
221     }
222
223     private synchronized void notifyStreamRemoved(final StreamNameType stream) {
224         for (final NetconfNotificationStreamListener streamListener : streamListeners) {
225             streamListener.onStreamUnregistered(stream);
226         }
227     }
228
229     @Override
230     public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() {
231         final NotificationPublisherRegistration notificationPublisherRegistration =
232                 registerNotificationPublisher(BASE_NETCONF_STREAM);
233         return new BaseNotificationPublisherReg(notificationPublisherRegistration);
234     }
235
236     private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration {
237         private NetconfNotificationManager baseListener;
238         private final StreamNameType registeredStream;
239
240         GenericNotificationPublisherReg(final NetconfNotificationManager baseListener,
241                                         final StreamNameType registeredStream) {
242             this.baseListener = baseListener;
243             this.registeredStream = registeredStream;
244         }
245
246         @Override
247         public void close() {
248             baseListener.unregisterNotificationPublisher(registeredStream, this);
249             baseListener = null;
250         }
251
252         @Override
253         public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
254             Preconditions.checkState(baseListener != null, "Already closed");
255             Preconditions.checkArgument(stream.equals(registeredStream));
256             baseListener.onNotification(stream, notification);
257         }
258     }
259
260     private static class BaseNotificationPublisherReg implements BaseNotificationPublisherRegistration {
261
262         static final SchemaPath CAPABILITY_CHANGE_SCHEMA_PATH = SchemaPath.create(true, NetconfCapabilityChange.QNAME);
263         static final SchemaPath SESSION_START_PATH = SchemaPath.create(true, NetconfSessionStart.QNAME);
264         static final SchemaPath SESSION_END_PATH = SchemaPath.create(true, NetconfSessionEnd.QNAME);
265
266         private final NotificationPublisherRegistration baseRegistration;
267
268         BaseNotificationPublisherReg(final NotificationPublisherRegistration baseRegistration) {
269             this.baseRegistration = baseRegistration;
270         }
271
272         @Override
273         public void close() {
274             baseRegistration.close();
275         }
276
277         private static NetconfNotification serializeNotification(final Notification notification,
278                 final SchemaPath path) {
279             return NotificationsTransformUtil.transform(notification, path);
280         }
281
282         @Override
283         public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
284             baseRegistration.onNotification(BASE_STREAM_NAME,
285                     serializeNotification(capabilityChange, CAPABILITY_CHANGE_SCHEMA_PATH));
286         }
287
288         @Override
289         public void onSessionStarted(final NetconfSessionStart start) {
290             baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(start, SESSION_START_PATH));
291         }
292
293         @Override
294         public void onSessionEnded(final NetconfSessionEnd end) {
295             baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(end, SESSION_END_PATH));
296         }
297     }
298
299     private class GenericNotificationListenerReg implements NotificationListenerRegistration {
300         private final NetconfNotificationListener listener;
301
302         GenericNotificationListenerReg(final NetconfNotificationListener listener) {
303             this.listener = listener;
304         }
305
306         public NetconfNotificationListener getListener() {
307             return listener;
308         }
309
310         @Override
311         public void close() {
312             notificationListeners.remove(BASE_STREAM_NAME, this);
313         }
314     }
315 }