X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fconfig-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fconfignetconfconnector%2Fosgi%2FYangStoreService.java;h=29616620920111b394dbe450284a9443b362c8ac;hp=969d7cfdb38bd2d90f7a9a7200412925e1762568;hb=8485b8c8ca9481cc8797b08d56930bbf67c7653d;hpb=89bbb529da1ea9e338b2b03192bd912003f7f00e diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java index 969d7cfdb3..2961662092 100644 --- a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java +++ b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java @@ -5,18 +5,265 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ + package org.opendaylight.controller.netconf.confignetconfconnector.osgi; -/** - * Yang store OSGi service - */ -public interface YangStoreService { +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.lang.ref.SoftReference; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicReference; +import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry; +import org.opendaylight.controller.netconf.api.Capability; +import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener; +import org.opendaylight.controller.netconf.notifications.BaseNetconfNotificationListener; +import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration; +import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector; +import org.opendaylight.controller.netconf.util.capability.YangModuleCapability; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class YangStoreService implements YangStoreContext { + + private static final Logger LOG = LoggerFactory.getLogger(YangStoreService.class); /** - * Module entry objects mapped to module names and namespaces. + * This is a rather interesting locking model. We need to guard against both the + * cache expiring from GC and being invalidated by schema context change. The + * context can change while we are doing processing, so we do not want to block + * it, so no synchronization can happen on the methods. + * + * So what we are doing is the following: + * + * We synchronize with GC as usual, using a SoftReference. * - * @return actual view of what is available in OSGi service registry. + * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when + * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now + * that may happen while the getter is already busy acting on the old schema context, + * so it needs to understand that a refresh has happened and retry. To do that, it + * attempts a CAS operation -- if it fails, in knows that the SoftReference has + * been replaced and thus it needs to retry. + * + * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally + * to stop multiple threads doing the same work. */ - YangStoreSnapshot getYangStoreSnapshot() throws YangStoreException; + private final AtomicReference> ref = + new AtomicReference<>(new SoftReference(null)); + + private final SchemaContextProvider schemaContextProvider; + private final BaseNetconfNotificationListener notificationPublisher; + + private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + return new Thread(r, "config-netconf-connector-capability-notifications"); + } + }); + + private final Set listeners = Collections.synchronizedSet(new HashSet()); + + public YangStoreService(final SchemaContextProvider schemaContextProvider, final BundleContext context) { + this(schemaContextProvider, new NotificationCollectorTracker(context)); + } + + public YangStoreService(final SchemaContextProvider schemaContextProvider, final BaseNetconfNotificationListener notificationHandler) { + this.schemaContextProvider = schemaContextProvider; + this.notificationPublisher = notificationHandler; + } + + private synchronized YangStoreContext getYangStoreSnapshot() { + SoftReference r = ref.get(); + YangStoreSnapshot ret = r.get(); + + while (ret == null) { + // We need to be compute a new value + ret = new YangStoreSnapshot(schemaContextProvider.getSchemaContext()); + + if (!ref.compareAndSet(r, new SoftReference<>(ret))) { + LOG.debug("Concurrent refresh detected, recomputing snapshot"); + r = ref.get(); + ret = null; + } + } + + return ret; + } + + @Override + public Map> getModuleMXBeanEntryMap() { + return getYangStoreSnapshot().getModuleMXBeanEntryMap(); + } + + @Override + public Map> getQNamesToIdentitiesToModuleMXBeanEntries() { + return getYangStoreSnapshot().getQNamesToIdentitiesToModuleMXBeanEntries(); + } + + @Override + public Set getModules() { + return getYangStoreSnapshot().getModules(); + } + + @Override + public String getModuleSource(final ModuleIdentifier moduleIdentifier) { + return getYangStoreSnapshot().getModuleSource(moduleIdentifier); + } + + public void refresh() { + final YangStoreSnapshot previous = ref.get().get(); + ref.set(new SoftReference(null)); + notificationExecutor.submit(new CapabilityChangeNotifier(previous)); + } + + public AutoCloseable registerCapabilityListener(final CapabilityListener listener) { + final SoftReference yangStoreSnapshotSoftReference = ref.get(); + + YangStoreContext ret = yangStoreSnapshotSoftReference != null ? yangStoreSnapshotSoftReference.get() : null; + if(ret == null) { + ret = getYangStoreSnapshot(); + } + + this.listeners.add(listener); + listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(ret)); + + return new AutoCloseable() { + @Override + public void close() throws Exception { + YangStoreService.this.listeners.remove(listener); + } + }; + } + + private static final Function MODULE_TO_CAPABILITY = new Function() { + @Override + public Capability apply(final Module module) { + return new YangModuleCapability(module, module.getSource()); + } + }; + + private final class CapabilityChangeNotifier implements Runnable { + + private final YangStoreSnapshot previous; + + public CapabilityChangeNotifier(final YangStoreSnapshot previous) { + this.previous = previous; + } + + @Override + public void run() { + final YangStoreContext current = getYangStoreSnapshot(); + + if(current.equals(previous) == false) { + final Sets.SetView removed = Sets.difference(previous.getModules(), current.getModules()); + final Sets.SetView added = Sets.difference(current.getModules(), previous.getModules()); + + // Notify notification manager + notificationPublisher.onCapabilityChanged(computeDiff(removed, added)); + + // Notify direct capability listener TODO would it not be better if the capability listeners went through notification manager ? + for (final CapabilityListener listener : listeners) { + listener.onCapabilitiesAdded(Sets.newHashSet(Collections2.transform(added, MODULE_TO_CAPABILITY))); + } + for (final CapabilityListener listener : listeners) { + listener.onCapabilitiesRemoved(Sets.newHashSet(Collections2.transform(removed, MODULE_TO_CAPABILITY))); + } + } + } + } + + private static final Function MODULE_TO_URI = new Function() { + @Override + public Uri apply(final Module input) { + return new Uri(new YangModuleCapability(input, input.getSource()).getCapabilityUri()); + } + }; + + static NetconfCapabilityChange computeDiff(final Sets.SetView removed, final Sets.SetView added) { + final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder(); + netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build()); + netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, MODULE_TO_URI))); + netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, MODULE_TO_URI))); + // TODO modified should be computed ... but why ? + netconfCapabilityChangeBuilder.setModifiedCapability(Collections.emptyList()); + return netconfCapabilityChangeBuilder.build(); + } + + + /** + * Looks for NetconfNotificationCollector service and publishes base netconf notifications if possible + */ + private static class NotificationCollectorTracker implements ServiceTrackerCustomizer, BaseNetconfNotificationListener, AutoCloseable { + + private final BundleContext context; + private final ServiceTracker listenerTracker; + private BaseNotificationPublisherRegistration publisherReg; + + public NotificationCollectorTracker(final BundleContext context) { + this.context = context; + listenerTracker = new ServiceTracker<>(context, NetconfNotificationCollector.class, this); + listenerTracker.open(); + } + + @Override + public synchronized NetconfNotificationCollector addingService(final ServiceReference reference) { + closePublisherRegistration(); + publisherReg = context.getService(reference).registerBaseNotificationPublisher(); + return null; + } + + @Override + public synchronized void modifiedService(final ServiceReference reference, final NetconfNotificationCollector service) { + closePublisherRegistration(); + publisherReg = context.getService(reference).registerBaseNotificationPublisher(); + } + + @Override + public synchronized void removedService(final ServiceReference reference, final NetconfNotificationCollector service) { + closePublisherRegistration(); + publisherReg = null; + } + + private void closePublisherRegistration() { + if(publisherReg != null) { + publisherReg.close(); + } + } + + @Override + public synchronized void close() { + closePublisherRegistration(); + listenerTracker.close(); + } + + @Override + public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) { + if(publisherReg == null) { + LOG.warn("Omitting notification due to missing notification service: {}", capabilityChange); + return; + } + publisherReg.onCapabilityChanged(capabilityChange); + } + } }