CDS: Add stress test RPC to the cars model
[controller.git] / opendaylight / netconf / config-netconf-connector / src / main / java / org / opendaylight / controller / netconf / confignetconfconnector / osgi / YangStoreService.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
10
11 import com.google.common.base.Function;
12 import com.google.common.collect.Collections2;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Sets;
15 import java.lang.ref.SoftReference;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
25 import org.opendaylight.controller.netconf.api.Capability;
26 import org.opendaylight.controller.netconf.api.monitoring.CapabilityListener;
27 import org.opendaylight.controller.netconf.notifications.BaseNetconfNotificationListener;
28 import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
29 import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
30 import org.opendaylight.controller.netconf.util.capability.YangModuleCapability;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.ChangedByBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.changed.by.parms.changed.by.server.or.user.ServerBuilder;
36 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
37 import org.opendaylight.yangtools.yang.common.QName;
38 import org.opendaylight.yangtools.yang.model.api.Module;
39 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
41 import org.osgi.framework.BundleContext;
42 import org.osgi.framework.ServiceReference;
43 import org.osgi.util.tracker.ServiceTracker;
44 import org.osgi.util.tracker.ServiceTrackerCustomizer;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class YangStoreService implements YangStoreContext {
49
50     private static final Logger LOG = LoggerFactory.getLogger(YangStoreService.class);
51
52     /**
53      * This is a rather interesting locking model. We need to guard against both the
54      * cache expiring from GC and being invalidated by schema context change. The
55      * context can change while we are doing processing, so we do not want to block
56      * it, so no synchronization can happen on the methods.
57      *
58      * So what we are doing is the following:
59      *
60      * We synchronize with GC as usual, using a SoftReference.
61      *
62      * The atomic reference is used to synchronize with {@link #refresh(org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext)}, e.g. when
63      * refresh happens, it will push a SoftReference(null), e.g. simulate the GC. Now
64      * that may happen while the getter is already busy acting on the old schema context,
65      * so it needs to understand that a refresh has happened and retry. To do that, it
66      * attempts a CAS operation -- if it fails, in knows that the SoftReference has
67      * been replaced and thus it needs to retry.
68      *
69      * Note that {@link #getYangStoreSnapshot()} will still use synchronize() internally
70      * to stop multiple threads doing the same work.
71      */
72     private final AtomicReference<SoftReference<YangStoreSnapshot>> ref =
73             new AtomicReference<>(new SoftReference<YangStoreSnapshot>(null));
74
75     private final AtomicReference<SoftReference<BindingRuntimeContext>> refBindingContext =
76             new AtomicReference<>(new SoftReference<BindingRuntimeContext>(null));
77
78     private final SchemaContextProvider schemaContextProvider;
79     private final BaseNetconfNotificationListener notificationPublisher;
80
81     private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
82         @Override
83         public Thread newThread(final Runnable r) {
84             return new Thread(r, "config-netconf-connector-capability-notifications");
85         }
86     });
87
88     private final Set<CapabilityListener> listeners = Collections.synchronizedSet(new HashSet<CapabilityListener>());
89
90     public YangStoreService(final SchemaContextProvider schemaContextProvider, final BundleContext context) {
91         this(schemaContextProvider, new NotificationCollectorTracker(context));
92     }
93
94     public YangStoreService(final SchemaContextProvider schemaContextProvider, final BaseNetconfNotificationListener notificationHandler) {
95         this.schemaContextProvider = schemaContextProvider;
96         this.notificationPublisher = notificationHandler;
97     }
98
99     private synchronized YangStoreContext getYangStoreSnapshot() {
100         SoftReference<YangStoreSnapshot> r = ref.get();
101         YangStoreSnapshot ret = r.get();
102
103         while (ret == null) {
104             // We need to be compute a new value
105             ret = new YangStoreSnapshot(schemaContextProvider.getSchemaContext(), refBindingContext.get().get());
106
107             if (!ref.compareAndSet(r, new SoftReference<>(ret))) {
108                 LOG.debug("Concurrent refresh detected, recomputing snapshot");
109                 r = ref.get();
110                 ret = null;
111             }
112         }
113
114         return ret;
115     }
116
117     @Override
118     public Map<String, Map<String, ModuleMXBeanEntry>> getModuleMXBeanEntryMap() {
119         return getYangStoreSnapshot().getModuleMXBeanEntryMap();
120     }
121
122     @Override
123     public Map<QName, Map<String, ModuleMXBeanEntry>> getQNamesToIdentitiesToModuleMXBeanEntries() {
124         return getYangStoreSnapshot().getQNamesToIdentitiesToModuleMXBeanEntries();
125     }
126
127     @Override
128     public Set<Module> getModules() {
129         return getYangStoreSnapshot().getModules();
130     }
131
132     @Override
133     public String getModuleSource(final ModuleIdentifier moduleIdentifier) {
134         return getYangStoreSnapshot().getModuleSource(moduleIdentifier);
135     }
136
137     @Override
138     public EnumResolver getEnumResolver() {
139         return getYangStoreSnapshot().getEnumResolver();
140     }
141
142     public void refresh(final BindingRuntimeContext runtimeContext) {
143         final YangStoreSnapshot previous = ref.get().get();
144         ref.set(new SoftReference<YangStoreSnapshot>(null));
145         refBindingContext.set(new SoftReference<>(runtimeContext));
146         notificationExecutor.submit(new CapabilityChangeNotifier(previous));
147     }
148
149     public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
150
151         YangStoreContext context = ref.get().get();
152
153         if(context == null) {
154             context = getYangStoreSnapshot();
155         }
156
157         this.listeners.add(listener);
158         listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(context));
159
160         return new AutoCloseable() {
161             @Override
162             public void close() throws Exception {
163                 YangStoreService.this.listeners.remove(listener);
164             }
165         };
166     }
167
168     private static final Function<Module, Capability> MODULE_TO_CAPABILITY = new Function<Module, Capability>() {
169         @Override
170         public Capability apply(final Module module) {
171             return new YangModuleCapability(module, module.getSource());
172         }
173     };
174
175     private final class CapabilityChangeNotifier implements Runnable {
176
177         private final YangStoreSnapshot previous;
178
179         public CapabilityChangeNotifier(final YangStoreSnapshot previous) {
180             this.previous = previous;
181         }
182
183         @Override
184         public void run() {
185             final YangStoreContext current = getYangStoreSnapshot();
186
187             if(current.equals(previous) == false) {
188                 final Sets.SetView<Module> removed = Sets.difference(previous.getModules(), current.getModules());
189                 final Sets.SetView<Module> added = Sets.difference(current.getModules(), previous.getModules());
190
191                 // Notify notification manager
192                 notificationPublisher.onCapabilityChanged(computeDiff(removed, added));
193
194                 // Notify direct capability listener TODO would it not be better if the capability listeners went through notification manager ?
195                 for (final CapabilityListener listener : listeners) {
196                     listener.onCapabilitiesAdded(Sets.newHashSet(Collections2.transform(added, MODULE_TO_CAPABILITY)));
197                 }
198                 for (final CapabilityListener listener : listeners) {
199                     listener.onCapabilitiesRemoved(Sets.newHashSet(Collections2.transform(removed, MODULE_TO_CAPABILITY)));
200                 }
201             }
202         }
203     }
204
205     private static final Function<Module, Uri> MODULE_TO_URI = new Function<Module, Uri>() {
206         @Override
207         public Uri apply(final Module input) {
208             return new Uri(new YangModuleCapability(input, input.getSource()).getCapabilityUri());
209         }
210     };
211
212     static NetconfCapabilityChange computeDiff(final Sets.SetView<Module> removed, final Sets.SetView<Module> added) {
213         final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
214         netconfCapabilityChangeBuilder.setChangedBy(new ChangedByBuilder().setServerOrUser(new ServerBuilder().setServer(true).build()).build());
215         netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(Collections2.transform(removed, MODULE_TO_URI)));
216         netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(Collections2.transform(added, MODULE_TO_URI)));
217         // TODO modified should be computed ... but why ?
218         netconfCapabilityChangeBuilder.setModifiedCapability(Collections.<Uri>emptyList());
219         return netconfCapabilityChangeBuilder.build();
220     }
221
222
223     /**
224      * Looks for NetconfNotificationCollector service and publishes base netconf notifications if possible
225      */
226     private static class NotificationCollectorTracker implements ServiceTrackerCustomizer<NetconfNotificationCollector, NetconfNotificationCollector>, BaseNetconfNotificationListener, AutoCloseable {
227
228         private final BundleContext context;
229         private final ServiceTracker<NetconfNotificationCollector, NetconfNotificationCollector> listenerTracker;
230         private BaseNotificationPublisherRegistration publisherReg;
231
232         public NotificationCollectorTracker(final BundleContext context) {
233             this.context = context;
234             listenerTracker = new ServiceTracker<>(context, NetconfNotificationCollector.class, this);
235             listenerTracker.open();
236         }
237
238         @Override
239         public synchronized NetconfNotificationCollector addingService(final ServiceReference<NetconfNotificationCollector> reference) {
240             closePublisherRegistration();
241             publisherReg = context.getService(reference).registerBaseNotificationPublisher();
242             return null;
243         }
244
245         @Override
246         public synchronized void modifiedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
247             closePublisherRegistration();
248             publisherReg = context.getService(reference).registerBaseNotificationPublisher();
249         }
250
251         @Override
252         public synchronized void removedService(final ServiceReference<NetconfNotificationCollector> reference, final NetconfNotificationCollector service) {
253             closePublisherRegistration();
254             publisherReg = null;
255         }
256
257         private void closePublisherRegistration() {
258             if(publisherReg != null) {
259                 publisherReg.close();
260             }
261         }
262
263         @Override
264         public synchronized void close() {
265             closePublisherRegistration();
266             listenerTracker.close();
267         }
268
269         @Override
270         public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
271             if(publisherReg == null) {
272                 LOG.warn("Omitting notification due to missing notification service: {}", capabilityChange);
273                 return;
274             }
275
276             publisherReg.onCapabilityChanged(capabilityChange);
277         }
278     }
279 }