Merge "BUG-2836 Workaround"
[controller.git] / opendaylight / netconf / config-netconf-connector / src / main / java / org / opendaylight / controller / netconf / confignetconfconnector / osgi / YangStoreService.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
10
11 import com.google.common.base.Function;
12 import com.google.common.collect.Collections2;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Sets;
15 import java.lang.ref.SoftReference;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
25 import org.opendaylight.controller.netconf.api.Capability;
26 import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener;
27 import org.opendaylight.controller.netconf.notifications.BaseNetconfNotificationListener;
28 import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
29 import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
30 import org.opendaylight.controller.netconf.util.capability.YangModuleCapability;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.model.api.Module;
38 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
40 import org.osgi.framework.BundleContext;
41 import org.osgi.framework.ServiceReference;
42 import org.osgi.util.tracker.ServiceTracker;
43 import org.osgi.util.tracker.ServiceTrackerCustomizer;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 public class YangStoreService implements YangStoreContext {
48
49     private static final Logger LOG = LoggerFactory.getLogger(YangStoreService.class);
50
51     /**
52      * This is a rather interesting locking model. We need to guard against both the
53      * cache expiring from GC and being invalidated by schema context change. The
54      * context can change while we are doing processing, so we do not want to block
55      * it, so no synchronization can happen on the methods.
56      *
57      * So what we are doing is the following:
58      *
59      * We synchronize with GC as usual, using a SoftReference.
60      *
61      * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when
62      * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
63      * that may happen while the getter is already busy acting on the old schema context,
64      * so it needs to understand that a refresh has happened and retry. To do that, it
65      * attempts a CAS operation -- if it fails, in knows that the SoftReference has
66      * been replaced and thus it needs to retry.
67      *
68      * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
69      * to stop multiple threads doing the same work.
70      */
71     private final AtomicReference<SoftReference<YangStoreSnapshot>> ref =
72             new AtomicReference<>(new SoftReference<YangStoreSnapshot>(null));
73
74     private final SchemaContextProvider schemaContextProvider;
75     private final BaseNetconfNotificationListener notificationPublisher;
76
77     private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
78         @Override
79         public Thread newThread(final Runnable r) {
80             return new Thread(r, "config-netconf-connector-capability-notifications");
81         }
82     });
83
84     private final Set<CapabilityListener> listeners = Collections.synchronizedSet(new HashSet<CapabilityListener>());
85
86     public YangStoreService(final SchemaContextProvider schemaContextProvider, final BundleContext context) {
87         this(schemaContextProvider, new NotificationCollectorTracker(context));
88     }
89
90     public YangStoreService(final SchemaContextProvider schemaContextProvider, final BaseNetconfNotificationListener notificationHandler) {
91         this.schemaContextProvider = schemaContextProvider;
92         this.notificationPublisher = notificationHandler;
93     }
94
95     private synchronized YangStoreContext getYangStoreSnapshot() {
96         SoftReference<YangStoreSnapshot> r = ref.get();
97         YangStoreSnapshot ret = r.get();
98
99         while (ret == null) {
100             // We need to be compute a new value
101             ret = new YangStoreSnapshot(schemaContextProvider.getSchemaContext());
102
103             if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
104                 LOG.debug("Concurrent refresh detected, recomputing snapshot");
105                 r = ref.get();
106                 ret = null;
107             }
108         }
109
110         return ret;
111     }
112
113     @Override
114     public Map<String, Map<String, ModuleMXBeanEntry>> getModuleMXBeanEntryMap() {
115         return getYangStoreSnapshot().getModuleMXBeanEntryMap();
116     }
117
118     @Override
119     public Map<QName, Map<String, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries() {
120         return getYangStoreSnapshot().getQNamesToIdentitiesToModuleMXBeanEntries();
121     }
122
123     @Override
124     public Set<Module> getModules() {
125         return getYangStoreSnapshot().getModules();
126     }
127
128     @Override
129     public String getModuleSource(final ModuleIdentifier moduleIdentifier) {
130         return getYangStoreSnapshot().getModuleSource(moduleIdentifier);
131     }
132
133     public void refresh() {
134         final YangStoreSnapshot previous = ref.get().get();
135         ref.set(new SoftReference<YangStoreSnapshot>(null));
136         notificationExecutor.submit(new CapabilityChangeNotifier(previous));
137     }
138
139     public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
140         if(ref.get() == null || ref.get().get() == null) {
141             getYangStoreSnapshot();
142         }
143
144         this.listeners.add(listener);
145         listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(ref.get().get()));
146
147         return new AutoCloseable() {
148             @Override
149             public void close() throws Exception {
150                 YangStoreService.this.listeners.remove(listener);
151             }
152         };
153     }
154
155     private static final Function<Module, Capability> MODULE_TO_CAPABILITY = new Function<Module, Capability>() {
156         @Override
157         public Capability apply(final Module module) {
158             return new YangModuleCapability(module, module.getSource());
159         }
160     };
161
162     private final class CapabilityChangeNotifier implements Runnable {
163
164         private final YangStoreSnapshot previous;
165
166         public CapabilityChangeNotifier(final YangStoreSnapshot previous) {
167             this.previous = previous;
168         }
169
170         @Override
171         public void run() {
172             final YangStoreContext current = getYangStoreSnapshot();
173
174             if(current.equals(previous) == false) {
175                 final Sets.SetView<Module> removed = Sets.difference(previous.getModules(), current.getModules());
176                 final Sets.SetView<Module> added = Sets.difference(current.getModules(), previous.getModules());
177
178                 // Notify notification manager
179                 notificationPublisher.onCapabilityChanged(computeDiff(removed, added));
180
181                 // Notify direct capability listener TODO would it not be better if the capability listeners went through notification manager ?
182                 for (final CapabilityListener listener : listeners) {
183                     listener.onCapabilitiesAdded(Sets.newHashSet(Collections2.transform(added, MODULE_TO_CAPABILITY)));
184                 }
185                 for (final CapabilityListener listener : listeners) {
186                     listener.onCapabilitiesRemoved(Sets.newHashSet(Collections2.transform(removed, MODULE_TO_CAPABILITY)));
187                 }
188             }
189         }
190     }
191
192     private static final Function<Module, Uri> MODULE_TO_URI = new Function<Module, Uri>() {
193         @Override
194         public Uri apply(final Module input) {
195             return new Uri(new YangModuleCapability(input, input.getSource()).getCapabilityUri());
196         }
197     };
198
199     static NetconfCapabilityChange computeDiff(final Sets.SetView<Module> removed, final Sets.SetView<Module> added) {
200         final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
201         netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
202         netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, MODULE_TO_URI)));
203         netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, MODULE_TO_URI)));
204         // TODO modified should be computed ... but why ?
205         netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
206         return netconfCapabilityChangeBuilder.build();
207     }
208
209
210     /**
211      * Looks for NetconfNotificationCollector service and publishes base netconf notifications if possible
212      */
213     private static class NotificationCollectorTracker implements ServiceTrackerCustomizer<NetconfNotificationCollector, NetconfNotificationCollector>, BaseNetconfNotificationListener, AutoCloseable {
214
215         private final BundleContext context;
216         private final ServiceTracker<NetconfNotificationCollector, NetconfNotificationCollector> listenerTracker;
217         private BaseNotificationPublisherRegistration publisherReg;
218
219         public NotificationCollectorTracker(final BundleContext context) {
220             this.context = context;
221             listenerTracker = new ServiceTracker<>(context, NetconfNotificationCollector.class, this);
222             listenerTracker.open();
223         }
224
225         @Override
226         public synchronized NetconfNotificationCollector addingService(final ServiceReference<NetconfNotificationCollector> reference) {
227             closePublisherRegistration();
228             publisherReg = context.getService(reference).registerBaseNotificationPublisher();
229             return null;
230         }
231
232         @Override
233         public synchronized void modifiedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
234             closePublisherRegistration();
235             publisherReg = context.getService(reference).registerBaseNotificationPublisher();
236         }
237
238         @Override
239         public synchronized void removedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
240             closePublisherRegistration();
241             publisherReg = null;
242         }
243
244         private void closePublisherRegistration() {
245             if(publisherReg != null) {
246                 publisherReg.close();
247             }
248         }
249
250         @Override
251         public synchronized void close() {
252             closePublisherRegistration();
253             listenerTracker.close();
254         }
255
256         @Override
257         public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
258             if(publisherReg == null) {
259                 LOG.warn("Omitting notification due to missing notification service: {}", capabilityChange);
260                 return;
261             }
262
263             publisherReg.onCapabilityChanged(capabilityChange);
264         }
265     }
266 }