Merge "BUG 3057 - notify added event source by topics created before"
[controller.git] / opendaylight / netconf / config-netconf-connector / src / main / java / org / opendaylight / controller / netconf / confignetconfconnector / osgi / YangStoreService.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
10
11 import com.google.common.base.Function;
12 import com.google.common.collect.Collections2;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Sets;
15 import java.lang.ref.SoftReference;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
25 import org.opendaylight.controller.netconf.api.Capability;
26 import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener;
27 import org.opendaylight.controller.netconf.notifications.BaseNetconfNotificationListener;
28 import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
29 import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
30 import org.opendaylight.controller.netconf.util.capability.YangModuleCapability;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.model.api.Module;
38 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
40 import org.osgi.framework.BundleContext;
41 import org.osgi.framework.ServiceReference;
42 import org.osgi.util.tracker.ServiceTracker;
43 import org.osgi.util.tracker.ServiceTrackerCustomizer;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 public class YangStoreService implements YangStoreContext {
48
49     private static final Logger LOG = LoggerFactory.getLogger(YangStoreService.class);
50
51     /**
52      * This is a rather interesting locking model. We need to guard against both the
53      * cache expiring from GC and being invalidated by schema context change. The
54      * context can change while we are doing processing, so we do not want to block
55      * it, so no synchronization can happen on the methods.
56      *
57      * So what we are doing is the following:
58      *
59      * We synchronize with GC as usual, using a SoftReference.
60      *
61      * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when
62      * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
63      * that may happen while the getter is already busy acting on the old schema context,
64      * so it needs to understand that a refresh has happened and retry. To do that, it
65      * attempts a CAS operation -- if it fails, in knows that the SoftReference has
66      * been replaced and thus it needs to retry.
67      *
68      * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
69      * to stop multiple threads doing the same work.
70      */
71     private final AtomicReference<SoftReference<YangStoreSnapshot>> ref =
72             new AtomicReference<>(new SoftReference<YangStoreSnapshot>(null));
73
74     private final SchemaContextProvider schemaContextProvider;
75     private final BaseNetconfNotificationListener notificationPublisher;
76
77     private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
78         @Override
79         public Thread newThread(final Runnable r) {
80             return new Thread(r, "config-netconf-connector-capability-notifications");
81         }
82     });
83
84     private final Set<CapabilityListener> listeners = Collections.synchronizedSet(new HashSet<CapabilityListener>());
85
86     public YangStoreService(final SchemaContextProvider schemaContextProvider, final BundleContext context) {
87         this(schemaContextProvider, new NotificationCollectorTracker(context));
88     }
89
90     public YangStoreService(final SchemaContextProvider schemaContextProvider, final BaseNetconfNotificationListener notificationHandler) {
91         this.schemaContextProvider = schemaContextProvider;
92         this.notificationPublisher = notificationHandler;
93     }
94
95     private synchronized YangStoreContext getYangStoreSnapshot() {
96         SoftReference<YangStoreSnapshot> r = ref.get();
97         YangStoreSnapshot ret = r.get();
98
99         while (ret == null) {
100             // We need to be compute a new value
101             ret = new YangStoreSnapshot(schemaContextProvider.getSchemaContext());
102
103             if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
104                 LOG.debug("Concurrent refresh detected, recomputing snapshot");
105                 r = ref.get();
106                 ret = null;
107             }
108         }
109
110         return ret;
111     }
112
113     @Override
114     public Map<String, Map<String, ModuleMXBeanEntry>> getModuleMXBeanEntryMap() {
115         return getYangStoreSnapshot().getModuleMXBeanEntryMap();
116     }
117
118     @Override
119     public Map<QName, Map<String, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries() {
120         return getYangStoreSnapshot().getQNamesToIdentitiesToModuleMXBeanEntries();
121     }
122
123     @Override
124     public Set<Module> getModules() {
125         return getYangStoreSnapshot().getModules();
126     }
127
128     @Override
129     public String getModuleSource(final ModuleIdentifier moduleIdentifier) {
130         return getYangStoreSnapshot().getModuleSource(moduleIdentifier);
131     }
132
133     public void refresh() {
134         final YangStoreSnapshot previous = ref.get().get();
135         ref.set(new SoftReference<YangStoreSnapshot>(null));
136         notificationExecutor.submit(new CapabilityChangeNotifier(previous));
137     }
138
139     public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
140
141         YangStoreContext context = ref.get().get();
142
143         if(context == null) {
144             context = getYangStoreSnapshot();
145         }
146
147         this.listeners.add(listener);
148         listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(context));
149
150         return new AutoCloseable() {
151             @Override
152             public void close() throws Exception {
153                 YangStoreService.this.listeners.remove(listener);
154             }
155         };
156     }
157
158     private static final Function<Module, Capability> MODULE_TO_CAPABILITY = new Function<Module, Capability>() {
159         @Override
160         public Capability apply(final Module module) {
161             return new YangModuleCapability(module, module.getSource());
162         }
163     };
164
165     private final class CapabilityChangeNotifier implements Runnable {
166
167         private final YangStoreSnapshot previous;
168
169         public CapabilityChangeNotifier(final YangStoreSnapshot previous) {
170             this.previous = previous;
171         }
172
173         @Override
174         public void run() {
175             final YangStoreContext current = getYangStoreSnapshot();
176
177             if(current.equals(previous) == false) {
178                 final Sets.SetView<Module> removed = Sets.difference(previous.getModules(), current.getModules());
179                 final Sets.SetView<Module> added = Sets.difference(current.getModules(), previous.getModules());
180
181                 // Notify notification manager
182                 notificationPublisher.onCapabilityChanged(computeDiff(removed, added));
183
184                 // Notify direct capability listener TODO would it not be better if the capability listeners went through notification manager ?
185                 for (final CapabilityListener listener : listeners) {
186                     listener.onCapabilitiesAdded(Sets.newHashSet(Collections2.transform(added, MODULE_TO_CAPABILITY)));
187                 }
188                 for (final CapabilityListener listener : listeners) {
189                     listener.onCapabilitiesRemoved(Sets.newHashSet(Collections2.transform(removed, MODULE_TO_CAPABILITY)));
190                 }
191             }
192         }
193     }
194
195     private static final Function<Module, Uri> MODULE_TO_URI = new Function<Module, Uri>() {
196         @Override
197         public Uri apply(final Module input) {
198             return new Uri(new YangModuleCapability(input, input.getSource()).getCapabilityUri());
199         }
200     };
201
202     static NetconfCapabilityChange computeDiff(final Sets.SetView<Module> removed, final Sets.SetView<Module> added) {
203         final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
204         netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
205         netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, MODULE_TO_URI)));
206         netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, MODULE_TO_URI)));
207         // TODO modified should be computed ... but why ?
208         netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
209         return netconfCapabilityChangeBuilder.build();
210     }
211
212
213     /**
214      * Looks for NetconfNotificationCollector service and publishes base netconf notifications if possible
215      */
216     private static class NotificationCollectorTracker implements ServiceTrackerCustomizer<NetconfNotificationCollector, NetconfNotificationCollector>, BaseNetconfNotificationListener, AutoCloseable {
217
218         private final BundleContext context;
219         private final ServiceTracker<NetconfNotificationCollector, NetconfNotificationCollector> listenerTracker;
220         private BaseNotificationPublisherRegistration publisherReg;
221
222         public NotificationCollectorTracker(final BundleContext context) {
223             this.context = context;
224             listenerTracker = new ServiceTracker<>(context, NetconfNotificationCollector.class, this);
225             listenerTracker.open();
226         }
227
228         @Override
229         public synchronized NetconfNotificationCollector addingService(final ServiceReference<NetconfNotificationCollector> reference) {
230             closePublisherRegistration();
231             publisherReg = context.getService(reference).registerBaseNotificationPublisher();
232             return null;
233         }
234
235         @Override
236         public synchronized void modifiedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
237             closePublisherRegistration();
238             publisherReg = context.getService(reference).registerBaseNotificationPublisher();
239         }
240
241         @Override
242         public synchronized void removedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
243             closePublisherRegistration();
244             publisherReg = null;
245         }
246
247         private void closePublisherRegistration() {
248             if(publisherReg != null) {
249                 publisherReg.close();
250             }
251         }
252
253         @Override
254         public synchronized void close() {
255             closePublisherRegistration();
256             listenerTracker.close();
257         }
258
259         @Override
260         public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
261             if(publisherReg == null) {
262                 LOG.warn("Omitting notification due to missing notification service: {}", capabilityChange);
263                 return;
264             }
265
266             publisherReg.onCapabilityChanged(capabilityChange);
267         }
268     }
269 }