2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
11 import com.google.common.base.Function;
12 import com.google.common.collect.Collections2;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Sets;
15 import java.lang.ref.SoftReference;
16 import java.util.Collections;
17 import java.util.HashSet;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
25 import org.opendaylight.controller.netconf.api.Capability;
26 import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener;
27 import org.opendaylight.controller.netconf.notifications.BaseNetconfNotificationListener;
28 import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
29 import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
30 import org.opendaylight.controller.netconf.util.capability.YangModuleCapability;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.model.api.Module;
38 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
40 import org.osgi.framework.BundleContext;
41 import org.osgi.framework.ServiceReference;
42 import org.osgi.util.tracker.ServiceTracker;
43 import org.osgi.util.tracker.ServiceTrackerCustomizer;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 public class YangStoreService implements YangStoreContext {
49 private static final Logger LOG = LoggerFactory.getLogger(YangStoreService.class);
52 * This is a rather interesting locking model. We need to guard against both the
53 * cache expiring from GC and being invalidated by schema context change. The
54 * context can change while we are doing processing, so we do not want to block
55 * it, so no synchronization can happen on the methods.
57 * So what we are doing is the following:
59 * We synchronize with GC as usual, using a SoftReference.
61 * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when
62 * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
63 * that may happen while the getter is already busy acting on the old schema context,
64 * so it needs to understand that a refresh has happened and retry. To do that, it
65 * attempts a CAS operation -- if it fails, in knows that the SoftReference has
66 * been replaced and thus it needs to retry.
68 * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
69 * to stop multiple threads doing the same work.
71 private final AtomicReference<SoftReference<YangStoreSnapshot>> ref =
72 new AtomicReference<>(new SoftReference<YangStoreSnapshot>(null));
74 private final SchemaContextProvider schemaContextProvider;
75 private final BaseNetconfNotificationListener notificationPublisher;
77 private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
79 public Thread newThread(final Runnable r) {
80 return new Thread(r, "config-netconf-connector-capability-notifications");
84 private final Set<CapabilityListener> listeners = Collections.synchronizedSet(new HashSet<CapabilityListener>());
86 public YangStoreService(final SchemaContextProvider schemaContextProvider, final BundleContext context) {
87 this(schemaContextProvider, new NotificationCollectorTracker(context));
90 public YangStoreService(final SchemaContextProvider schemaContextProvider, final BaseNetconfNotificationListener notificationHandler) {
91 this.schemaContextProvider = schemaContextProvider;
92 this.notificationPublisher = notificationHandler;
95 private synchronized YangStoreContext getYangStoreSnapshot() {
96 SoftReference<YangStoreSnapshot> r = ref.get();
97 YangStoreSnapshot ret = r.get();
100 // We need to be compute a new value
101 ret = new YangStoreSnapshot(schemaContextProvider.getSchemaContext());
103 if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
104 LOG.debug("Concurrent refresh detected, recomputing snapshot");
114 public Map<String, Map<String, ModuleMXBeanEntry>> getModuleMXBeanEntryMap() {
115 return getYangStoreSnapshot().getModuleMXBeanEntryMap();
119 public Map<QName, Map<String, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries() {
120 return getYangStoreSnapshot().getQNamesToIdentitiesToModuleMXBeanEntries();
124 public Set<Module> getModules() {
125 return getYangStoreSnapshot().getModules();
129 public String getModuleSource(final ModuleIdentifier moduleIdentifier) {
130 return getYangStoreSnapshot().getModuleSource(moduleIdentifier);
133 public void refresh() {
134 final YangStoreSnapshot previous = ref.get().get();
135 ref.set(new SoftReference<YangStoreSnapshot>(null));
136 notificationExecutor.submit(new CapabilityChangeNotifier(previous));
139 public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
140 final SoftReference<YangStoreSnapshot> yangStoreSnapshotSoftReference = ref.get();
142 YangStoreContext ret = yangStoreSnapshotSoftReference != null ? yangStoreSnapshotSoftReference.get() : null;
144 ret = getYangStoreSnapshot();
147 this.listeners.add(listener);
148 listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(ret));
150 return new AutoCloseable() {
152 public void close() throws Exception {
153 YangStoreService.this.listeners.remove(listener);
158 private static final Function<Module, Capability> MODULE_TO_CAPABILITY = new Function<Module, Capability>() {
160 public Capability apply(final Module module) {
161 return new YangModuleCapability(module, module.getSource());
165 private final class CapabilityChangeNotifier implements Runnable {
167 private final YangStoreSnapshot previous;
169 public CapabilityChangeNotifier(final YangStoreSnapshot previous) {
170 this.previous = previous;
175 final YangStoreContext current = getYangStoreSnapshot();
177 if(current.equals(previous) == false) {
178 final Sets.SetView<Module> removed = Sets.difference(previous.getModules(), current.getModules());
179 final Sets.SetView<Module> added = Sets.difference(current.getModules(), previous.getModules());
181 // Notify notification manager
182 notificationPublisher.onCapabilityChanged(computeDiff(removed, added));
184 // Notify direct capability listener TODO would it not be better if the capability listeners went through notification manager ?
185 for (final CapabilityListener listener : listeners) {
186 listener.onCapabilitiesAdded(Sets.newHashSet(Collections2.transform(added, MODULE_TO_CAPABILITY)));
188 for (final CapabilityListener listener : listeners) {
189 listener.onCapabilitiesRemoved(Sets.newHashSet(Collections2.transform(removed, MODULE_TO_CAPABILITY)));
195 private static final Function<Module, Uri> MODULE_TO_URI = new Function<Module, Uri>() {
197 public Uri apply(final Module input) {
198 return new Uri(new YangModuleCapability(input, input.getSource()).getCapabilityUri());
202 static NetconfCapabilityChange computeDiff(final Sets.SetView<Module> removed, final Sets.SetView<Module> added) {
203 final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
204 netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
205 netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, MODULE_TO_URI)));
206 netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, MODULE_TO_URI)));
207 // TODO modified should be computed ... but why ?
208 netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
209 return netconfCapabilityChangeBuilder.build();
214 * Looks for NetconfNotificationCollector service and publishes base netconf notifications if possible
216 private static class NotificationCollectorTracker implements ServiceTrackerCustomizer<NetconfNotificationCollector, NetconfNotificationCollector>, BaseNetconfNotificationListener, AutoCloseable {
218 private final BundleContext context;
219 private final ServiceTracker<NetconfNotificationCollector, NetconfNotificationCollector> listenerTracker;
220 private BaseNotificationPublisherRegistration publisherReg;
222 public NotificationCollectorTracker(final BundleContext context) {
223 this.context = context;
224 listenerTracker = new ServiceTracker<>(context, NetconfNotificationCollector.class, this);
225 listenerTracker.open();
229 public synchronized NetconfNotificationCollector addingService(final ServiceReference<NetconfNotificationCollector> reference) {
230 closePublisherRegistration();
231 publisherReg = context.getService(reference).registerBaseNotificationPublisher();
236 public synchronized void modifiedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
237 closePublisherRegistration();
238 publisherReg = context.getService(reference).registerBaseNotificationPublisher();
242 public synchronized void removedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
243 closePublisherRegistration();
247 private void closePublisherRegistration() {
248 if(publisherReg != null) {
249 publisherReg.close();
254 public synchronized void close() {
255 closePublisherRegistration();
256 listenerTracker.close();
260 public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
261 if(publisherReg == null) {
262 LOG.warn("Omitting notification due to missing notification service: {}", capabilityChange);
266 publisherReg.onCapabilityChanged(capabilityChange);