Remove netconf from commons/opendaylight pom
[controller.git] / opendaylight / netconf / netconf-notifications-impl / src / main / java / org / opendaylight / controller / netconf / notifications / impl / NetconfNotificationManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.notifications.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.HashMultimap;
13 import com.google.common.collect.HashMultiset;
14 import com.google.common.collect.Lists;
15 import com.google.common.collect.Maps;
16 import com.google.common.collect.Multimap;
17 import com.google.common.collect.Multiset;
18 import com.google.common.collect.Sets;
19 import java.util.Map;
20 import java.util.Set;
21 import javax.annotation.concurrent.GuardedBy;
22 import javax.annotation.concurrent.ThreadSafe;
23 import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
24 import org.opendaylight.controller.netconf.notifications.NetconfNotification;
25 import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
26 import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
27 import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
28 import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
29 import org.opendaylight.controller.netconf.notifications.NotificationPublisherRegistration;
30 import org.opendaylight.controller.netconf.notifications.NotificationRegistration;
31 import org.opendaylight.controller.netconf.notifications.impl.ops.NotificationsTransformUtil;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
37 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 @ThreadSafe
43 public class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry, NetconfNotificationListener, AutoCloseable {
44
45     public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF");
46     public static final Stream BASE_NETCONF_STREAM;
47
48     static {
49         BASE_NETCONF_STREAM = new StreamBuilder()
50                 .setName(BASE_STREAM_NAME)
51                 .setKey(new StreamKey(BASE_STREAM_NAME))
52                 .setReplaySupport(false)
53                 .setDescription("Default Event Stream")
54                 .build();
55     }
56
57     private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class);
58
59     // TODO excessive synchronization provides thread safety but is most likely not optimal (combination of concurrent collections might improve performance)
60     // And also calling callbacks from a synchronized block is dangerous since the listeners/publishers can block the whole notification processing
61
62     @GuardedBy("this")
63     private final Multimap<StreamNameType, GenericNotificationListenerReg> notificationListeners = HashMultimap.create();
64
65     @GuardedBy("this")
66     private final Set<NetconfNotificationStreamListener> streamListeners = Sets.newHashSet();
67
68     @GuardedBy("this")
69     private final Map<StreamNameType, Stream> streamMetadata = Maps.newHashMap();
70
71     @GuardedBy("this")
72     private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
73
74     @GuardedBy("this")
75     private final Set<GenericNotificationPublisherReg> notificationPublishers = Sets.newHashSet();
76
77     @Override
78     public synchronized void onNotification(final StreamNameType stream, final NetconfNotification notification) {
79         LOG.debug("Notification of type {} detected", stream);
80         if (LOG.isTraceEnabled()) {
81             LOG.debug("Notification of type {} detected: {}", stream, notification);
82         }
83
84         for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(BASE_STREAM_NAME)) {
85             listenerReg.getListener().onNotification(BASE_STREAM_NAME, notification);
86         }
87     }
88
89     @Override
90     public synchronized NotificationListenerRegistration registerNotificationListener(final StreamNameType stream, final NetconfNotificationListener listener) {
91         Preconditions.checkNotNull(stream);
92         Preconditions.checkNotNull(listener);
93
94         LOG.trace("Notification listener registered for stream: {}", stream);
95
96         final GenericNotificationListenerReg genericNotificationListenerReg = new GenericNotificationListenerReg(listener) {
97             @Override
98             public void close() {
99                 synchronized (NetconfNotificationManager.this) {
100                     LOG.trace("Notification listener unregistered for stream: {}", stream);
101                     super.close();
102                 }
103             }
104         };
105
106         notificationListeners.put(BASE_STREAM_NAME, genericNotificationListenerReg);
107         return genericNotificationListenerReg;
108     }
109
110     @Override
111     public synchronized Streams getNotificationPublishers() {
112         return new StreamsBuilder().setStream(Lists.newArrayList(streamMetadata.values())).build();
113     }
114
115     @Override
116     public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) {
117         return availableStreams.contains(streamNameType);
118     }
119
120     @Override
121     public synchronized NotificationRegistration registerStreamListener(final NetconfNotificationStreamListener listener) {
122         streamListeners.add(listener);
123
124         // Notify about all already available
125         for (final Stream availableStream : streamMetadata.values()) {
126             listener.onStreamRegistered(availableStream);
127         }
128
129         return new NotificationRegistration() {
130             @Override
131             public void close() {
132                 synchronized (NetconfNotificationManager.this) {
133                     streamListeners.remove(listener);
134                 }
135             }
136         };
137     }
138
139     @Override
140     public synchronized void close() {
141         // Unregister all listeners
142         for (final GenericNotificationListenerReg genericNotificationListenerReg : notificationListeners.values()) {
143             genericNotificationListenerReg.close();
144         }
145         notificationListeners.clear();
146
147         // Unregister all publishers
148         for (final GenericNotificationPublisherReg notificationPublisher : notificationPublishers) {
149             notificationPublisher.close();
150         }
151         notificationPublishers.clear();
152
153         // Clear stream Listeners
154         streamListeners.clear();
155     }
156
157     @Override
158     public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
159         Preconditions.checkNotNull(stream);
160         final StreamNameType streamName = stream.getName();
161
162         LOG.debug("Notification publisher registered for stream: {}", streamName);
163         if (LOG.isTraceEnabled()) {
164             LOG.trace("Notification publisher registered for stream: {}", stream);
165         }
166
167         if (streamMetadata.containsKey(streamName)) {
168             LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName, streamMetadata.get(streamName));
169         } else {
170             streamMetadata.put(streamName, stream);
171         }
172
173         availableStreams.add(streamName);
174
175         final GenericNotificationPublisherReg genericNotificationPublisherReg = new GenericNotificationPublisherReg(this, streamName) {
176             @Override
177             public void close() {
178                 synchronized (NetconfNotificationManager.this) {
179                     super.close();
180                 }
181             }
182         };
183
184         notificationPublishers.add(genericNotificationPublisherReg);
185
186         notifyStreamAdded(stream);
187         return genericNotificationPublisherReg;
188     }
189
190     private void unregisterNotificationPublisher(final StreamNameType streamName, final GenericNotificationPublisherReg genericNotificationPublisherReg) {
191         availableStreams.remove(streamName);
192         notificationPublishers.remove(genericNotificationPublisherReg);
193
194         LOG.debug("Notification publisher unregistered for stream: {}", streamName);
195
196         // Notify stream listeners if all publishers are gone and also clear metadata for stream
197         if (!isStreamAvailable(streamName)) {
198             LOG.debug("Notification stream: {} became unavailable", streamName);
199             streamMetadata.remove(streamName);
200             notifyStreamRemoved(streamName);
201         }
202     }
203
204     private synchronized void notifyStreamAdded(final Stream stream) {
205         for (final NetconfNotificationStreamListener streamListener : streamListeners) {
206             streamListener.onStreamRegistered(stream);
207         }
208     }
209
210     private synchronized void notifyStreamRemoved(final StreamNameType stream) {
211         for (final NetconfNotificationStreamListener streamListener : streamListeners) {
212             streamListener.onStreamUnregistered(stream);
213         }
214     }
215
216     @Override
217     public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() {
218         final NotificationPublisherRegistration notificationPublisherRegistration = registerNotificationPublisher(BASE_NETCONF_STREAM);
219         return new BaseNotificationPublisherReg(notificationPublisherRegistration);
220     }
221
222     private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration {
223         private NetconfNotificationManager baseListener;
224         private final StreamNameType registeredStream;
225
226         public GenericNotificationPublisherReg(final NetconfNotificationManager baseListener, final StreamNameType registeredStream) {
227             this.baseListener = baseListener;
228             this.registeredStream = registeredStream;
229         }
230
231         @Override
232         public void close() {
233             baseListener.unregisterNotificationPublisher(registeredStream, this);
234             baseListener = null;
235         }
236
237         @Override
238         public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
239             Preconditions.checkState(baseListener != null, "Already closed");
240             Preconditions.checkArgument(stream.equals(registeredStream));
241             baseListener.onNotification(stream, notification);
242         }
243     }
244
245     private static class BaseNotificationPublisherReg implements BaseNotificationPublisherRegistration {
246
247         private final NotificationPublisherRegistration baseRegistration;
248
249         public BaseNotificationPublisherReg(final NotificationPublisherRegistration baseRegistration) {
250             this.baseRegistration = baseRegistration;
251         }
252
253         @Override
254         public void close() {
255             baseRegistration.close();
256         }
257
258         private static NetconfNotification serializeNotification(final NetconfCapabilityChange capabilityChange) {
259             return NotificationsTransformUtil.transform(capabilityChange);
260         }
261
262         @Override
263         public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
264             baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(capabilityChange));
265 //            baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(computeDiff(removed, added)));
266         }
267     }
268
269     private class GenericNotificationListenerReg implements NotificationListenerRegistration {
270         private final NetconfNotificationListener listener;
271
272         public GenericNotificationListenerReg(final NetconfNotificationListener listener) {
273             this.listener = listener;
274         }
275
276         public NetconfNotificationListener getListener() {
277             return listener;
278         }
279
280         @Override
281         public void close() {
282             notificationListeners.remove(BASE_STREAM_NAME, this);
283         }
284     }
285 }