+ * The atomic reference is used to synchronize with {@link #refresh()}, e.g. when
+ * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
+ * that may happen while the getter is already busy acting on the old schema context,
+ * so it needs to understand that a refresh has happened and retry. To do that, it
+ * attempts a CAS operation -- if it fails, in knows that the SoftReference has
+ * been replaced and thus it needs to retry.
+ *
+ * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
+ * to stop multiple threads doing the same work.
+ */
+ private final AtomicReference<SoftReference<YangStoreSnapshot>> ref =
+ new AtomicReference<>(new SoftReference<YangStoreSnapshot>(null));
+
+ private final SchemaContextProvider schemaContextProvider;
+ private final BaseNetconfNotificationListener notificationPublisher;
+
+ private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ return new Thread(r, "config-netconf-connector-capability-notifications");
+ }
+ });
+
+ private final Set<CapabilityListener> listeners = Collections.synchronizedSet(new HashSet<CapabilityListener>());
+
+ public YangStoreService(final SchemaContextProvider schemaContextProvider, final BundleContext context) {
+ this(schemaContextProvider, new NotificationCollectorTracker(context));
+ }
+
+ public YangStoreService(final SchemaContextProvider schemaContextProvider, final BaseNetconfNotificationListener notificationHandler) {
+ this.schemaContextProvider = schemaContextProvider;
+ this.notificationPublisher = notificationHandler;
+ }
+
+ private synchronized YangStoreContext getYangStoreSnapshot() {
+ SoftReference<YangStoreSnapshot> r = ref.get();
+ YangStoreSnapshot ret = r.get();
+
+ while (ret == null) {
+ // We need to be compute a new value
+ ret = new YangStoreSnapshot(schemaContextProvider.getSchemaContext());
+
+ if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
+ LOG.debug("Concurrent refresh detected, recomputing snapshot");
+ r = ref.get();
+ ret = null;
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public Map<String, Map<String, ModuleMXBeanEntry>> getModuleMXBeanEntryMap() {
+ return getYangStoreSnapshot().getModuleMXBeanEntryMap();
+ }
+
+ @Override
+ public Map<QName, Map<String, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries() {
+ return getYangStoreSnapshot().getQNamesToIdentitiesToModuleMXBeanEntries();
+ }
+
+ @Override
+ public Set<Module> getModules() {
+ return getYangStoreSnapshot().getModules();
+ }
+
+ @Override
+ public String getModuleSource(final ModuleIdentifier moduleIdentifier) {
+ return getYangStoreSnapshot().getModuleSource(moduleIdentifier);
+ }
+
+ public void refresh() {
+ final YangStoreSnapshot previous = ref.get().get();
+ ref.set(new SoftReference<YangStoreSnapshot>(null));
+ notificationExecutor.submit(new CapabilityChangeNotifier(previous));
+ }
+
+ public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
+
+ YangStoreContext context = ref.get().get();
+
+ if(context == null) {
+ context = getYangStoreSnapshot();
+ }
+
+ this.listeners.add(listener);
+ listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(context));
+
+ return new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ YangStoreService.this.listeners.remove(listener);
+ }
+ };
+ }
+
+ private static final Function<Module, Capability> MODULE_TO_CAPABILITY = new Function<Module, Capability>() {
+ @Override
+ public Capability apply(final Module module) {
+ return new YangModuleCapability(module, module.getSource());
+ }
+ };
+
+ private final class CapabilityChangeNotifier implements Runnable {
+
+ private final YangStoreSnapshot previous;
+
+ public CapabilityChangeNotifier(final YangStoreSnapshot previous) {
+ this.previous = previous;
+ }
+
+ @Override
+ public void run() {
+ final YangStoreContext current = getYangStoreSnapshot();
+
+ if(current.equals(previous) == false) {
+ final Sets.SetView<Module> removed = Sets.difference(previous.getModules(), current.getModules());
+ final Sets.SetView<Module> added = Sets.difference(current.getModules(), previous.getModules());
+
+ // Notify notification manager
+ notificationPublisher.onCapabilityChanged(computeDiff(removed, added));
+
+ // Notify direct capability listener TODO would it not be better if the capability listeners went through notification manager ?
+ for (final CapabilityListener listener : listeners) {
+ listener.onCapabilitiesAdded(Sets.newHashSet(Collections2.transform(added, MODULE_TO_CAPABILITY)));
+ }
+ for (final CapabilityListener listener : listeners) {
+ listener.onCapabilitiesRemoved(Sets.newHashSet(Collections2.transform(removed, MODULE_TO_CAPABILITY)));
+ }
+ }
+ }
+ }
+
+ private static final Function<Module, Uri> MODULE_TO_URI = new Function<Module, Uri>() {
+ @Override
+ public Uri apply(final Module input) {
+ return new Uri(new YangModuleCapability(input, input.getSource()).getCapabilityUri());
+ }
+ };
+
+ static NetconfCapabilityChange computeDiff(final Sets.SetView<Module> removed, final Sets.SetView<Module> added) {
+ final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
+ netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
+ netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, MODULE_TO_URI)));
+ netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, MODULE_TO_URI)));
+ // TODO modified should be computed ... but why ?
+ netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
+ return netconfCapabilityChangeBuilder.build();
+ }
+
+
+ /**
+ * Looks for NetconfNotificationCollector service and publishes base netconf notifications if possible