/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.netconf.confignetconfconnector.osgi; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.lang.ref.SoftReference; import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry; import org.opendaylight.controller.netconf.api.Capability; import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener; import org.opendaylight.controller.netconf.notifications.BaseNetconfNotificationListener; import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration; import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector; import org.opendaylight.controller.netconf.util.capability.YangModuleCapability; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; import org.osgi.util.tracker.ServiceTracker; import org.osgi.util.tracker.ServiceTrackerCustomizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class YangStoreService implements YangStoreContext { private static final Logger LOG = LoggerFactory.getLogger(YangStoreService.class); /** * This is a rather interesting locking model. We need to guard against both the * cache expiring from GC and being invalidated by schema context change. The * context can change while we are doing processing, so we do not want to block * it, so no synchronization can happen on the methods. * * So what we are doing is the following: * * We synchronize with GC as usual, using a SoftReference. * * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now * that may happen while the getter is already busy acting on the old schema context, * so it needs to understand that a refresh has happened and retry. To do that, it * attempts a CAS operation -- if it fails, in knows that the SoftReference has * been replaced and thus it needs to retry. * * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally * to stop multiple threads doing the same work. */ private final AtomicReference> ref = new AtomicReference<>(new SoftReference(null)); private final SchemaContextProvider schemaContextProvider; private final BaseNetconfNotificationListener notificationPublisher; private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(final Runnable r) { return new Thread(r, "config-netconf-connector-capability-notifications"); } }); private final Set listeners = Collections.synchronizedSet(new HashSet()); public YangStoreService(final SchemaContextProvider schemaContextProvider, final BundleContext context) { this(schemaContextProvider, new NotificationCollectorTracker(context)); } public YangStoreService(final SchemaContextProvider schemaContextProvider, final BaseNetconfNotificationListener notificationHandler) { this.schemaContextProvider = schemaContextProvider; this.notificationPublisher = notificationHandler; } private synchronized YangStoreContext getYangStoreSnapshot() { SoftReference r = ref.get(); YangStoreSnapshot ret = r.get(); while (ret == null) { // We need to be compute a new value ret = new YangStoreSnapshot(schemaContextProvider.getSchemaContext()); if (!ref.compareAndSet(r, new SoftReference<>(ret))) { LOG.debug("Concurrent refresh detected, recomputing snapshot"); r = ref.get(); ret = null; } } return ret; } @Override public Map> getModuleMXBeanEntryMap() { return getYangStoreSnapshot().getModuleMXBeanEntryMap(); } @Override public Map> getQNamesToIdentitiesToModuleMXBeanEntries() { return getYangStoreSnapshot().getQNamesToIdentitiesToModuleMXBeanEntries(); } @Override public Set getModules() { return getYangStoreSnapshot().getModules(); } @Override public String getModuleSource(final ModuleIdentifier moduleIdentifier) { return getYangStoreSnapshot().getModuleSource(moduleIdentifier); } public void refresh() { final YangStoreSnapshot previous = ref.get().get(); ref.set(new SoftReference(null)); notificationExecutor.submit(new CapabilityChangeNotifier(previous)); } public AutoCloseable registerCapabilityListener(final CapabilityListener listener) { final SoftReference yangStoreSnapshotSoftReference = ref.get(); YangStoreContext ret = yangStoreSnapshotSoftReference != null ? yangStoreSnapshotSoftReference.get() : null; if(ret == null) { ret = getYangStoreSnapshot(); } this.listeners.add(listener); listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(ret)); return new AutoCloseable() { @Override public void close() throws Exception { YangStoreService.this.listeners.remove(listener); } }; } private static final Function MODULE_TO_CAPABILITY = new Function() { @Override public Capability apply(final Module module) { return new YangModuleCapability(module, module.getSource()); } }; private final class CapabilityChangeNotifier implements Runnable { private final YangStoreSnapshot previous; public CapabilityChangeNotifier(final YangStoreSnapshot previous) { this.previous = previous; } @Override public void run() { final YangStoreContext current = getYangStoreSnapshot(); if(current.equals(previous) == false) { final Sets.SetView removed = Sets.difference(previous.getModules(), current.getModules()); final Sets.SetView added = Sets.difference(current.getModules(), previous.getModules()); // Notify notification manager notificationPublisher.onCapabilityChanged(computeDiff(removed, added)); // Notify direct capability listener TODO would it not be better if the capability listeners went through notification manager ? for (final CapabilityListener listener : listeners) { listener.onCapabilitiesAdded(Sets.newHashSet(Collections2.transform(added, MODULE_TO_CAPABILITY))); } for (final CapabilityListener listener : listeners) { listener.onCapabilitiesRemoved(Sets.newHashSet(Collections2.transform(removed, MODULE_TO_CAPABILITY))); } } } } private static final Function MODULE_TO_URI = new Function() { @Override public Uri apply(final Module input) { return new Uri(new YangModuleCapability(input, input.getSource()).getCapabilityUri()); } }; static NetconfCapabilityChange computeDiff(final Sets.SetView removed, final Sets.SetView added) { final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder(); netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build()); netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, MODULE_TO_URI))); netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, MODULE_TO_URI))); // TODO modified should be computed ... but why ? netconfCapabilityChangeBuilder.setModifiedCapability(Collections.emptyList()); return netconfCapabilityChangeBuilder.build(); } /** * Looks for NetconfNotificationCollector service and publishes base netconf notifications if possible */ private static class NotificationCollectorTracker implements ServiceTrackerCustomizer, BaseNetconfNotificationListener, AutoCloseable { private final BundleContext context; private final ServiceTracker listenerTracker; private BaseNotificationPublisherRegistration publisherReg; public NotificationCollectorTracker(final BundleContext context) { this.context = context; listenerTracker = new ServiceTracker<>(context, NetconfNotificationCollector.class, this); listenerTracker.open(); } @Override public synchronized NetconfNotificationCollector addingService(final ServiceReference reference) { closePublisherRegistration(); publisherReg = context.getService(reference).registerBaseNotificationPublisher(); return null; } @Override public synchronized void modifiedService(final ServiceReference reference, final NetconfNotificationCollector service) { closePublisherRegistration(); publisherReg = context.getService(reference).registerBaseNotificationPublisher(); } @Override public synchronized void removedService(final ServiceReference reference, final NetconfNotificationCollector service) { closePublisherRegistration(); publisherReg = null; } private void closePublisherRegistration() { if(publisherReg != null) { publisherReg.close(); } } @Override public synchronized void close() { closePublisherRegistration(); listenerTracker.close(); } @Override public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) { if(publisherReg == null) { LOG.warn("Omitting notification due to missing notification service: {}", capabilityChange); return; } publisherReg.onCapabilityChanged(capabilityChange); } } }