Fixed bug in Data store where multiple readers could overwrite
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / SchemaServiceImpl.java
1 package org.opendaylight.controller.sal.dom.broker;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.net.URL;
6 import java.util.ArrayList;
7 import java.util.Collection;
8 import java.util.Enumeration;
9 import java.util.List;
10 import java.util.Set;
11 import java.util.zip.Checksum;
12
13 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
14 import org.osgi.util.tracker.BundleTracker;
15 import org.osgi.util.tracker.BundleTrackerCustomizer;
16 import org.osgi.util.tracker.ServiceTracker;
17 import org.osgi.util.tracker.ServiceTrackerCustomizer;
18 import org.opendaylight.yangtools.yang.model.api.Module;
19 import org.opendaylight.yangtools.yang.model.parser.api.YangModelParser;
20 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
21 import org.osgi.framework.Bundle;
22 import org.osgi.framework.BundleActivator;
23 import org.osgi.framework.BundleContext;
24 import org.osgi.framework.BundleEvent;
25 import org.osgi.framework.ServiceReference;
26 import org.opendaylight.yangtools.concepts.ListenerRegistration;
27 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
28 import org.opendaylight.controller.sal.core.api.model.SchemaService;
29 import org.opendaylight.controller.sal.core.api.model.SchemaServiceListener;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import com.google.common.base.Function;
34 import com.google.common.base.Optional;
35 import com.google.common.base.Preconditions;
36 import com.google.common.collect.Collections2;
37 import com.google.common.collect.HashMultimap;
38 import com.google.common.collect.Multimap;
39 import com.google.common.collect.Sets;
40
41 import static com.google.common.base.Preconditions.*;
42
43 public class SchemaServiceImpl implements //
44         SchemaService, //
45         ServiceTrackerCustomizer<SchemaServiceListener, SchemaServiceListener>, //
46         AutoCloseable {
47     private static final Logger logger = LoggerFactory.getLogger(SchemaServiceImpl.class);
48
49     private ListenerRegistry<SchemaServiceListener> listeners;
50     private YangModelParser parser;
51
52     private BundleContext context;
53     private BundleScanner scanner = new BundleScanner();
54
55     /**
56      * Map of currently problematic yang files that should get fixed eventually
57      * after all events are received.
58      */
59     private final Multimap<Bundle, URL> inconsistentBundlesToYangURLs = HashMultimap.create();
60     private final Multimap<Bundle, URL> consistentBundlesToYangURLs = HashMultimap.create();
61     private BundleTracker<Object> bundleTracker;
62     private final YangStoreCache cache = new YangStoreCache();
63
64     private ServiceTracker<SchemaServiceListener, SchemaServiceListener> listenerTracker;
65
66     public ListenerRegistry<SchemaServiceListener> getListeners() {
67         return listeners;
68     }
69
70     public void setListeners(ListenerRegistry<SchemaServiceListener> listeners) {
71         this.listeners = listeners;
72     }
73
74     public YangModelParser getParser() {
75         return parser;
76     }
77
78     public void setParser(YangModelParser parser) {
79         this.parser = parser;
80     }
81
82     public BundleContext getContext() {
83         return context;
84     }
85
86     public void setContext(BundleContext context) {
87         this.context = context;
88     }
89
90     public void start() {
91         checkState(parser != null);
92         checkState(context != null);
93         if (listeners == null) {
94             listeners = new ListenerRegistry<>();
95         }
96
97         listenerTracker = new ServiceTracker<>(context, SchemaServiceListener.class, this);
98         bundleTracker = new BundleTracker<Object>(context, BundleEvent.RESOLVED | BundleEvent.UNRESOLVED, scanner);
99         bundleTracker.open();
100         listenerTracker.open();
101     }
102
103     public SchemaContext getGlobalContext() {
104         return getSchemaContextSnapshot();
105     }
106
107     public synchronized SchemaContext getSchemaContextSnapshot() {
108         Optional<SchemaContext> yangStoreOpt = cache.getCachedSchemaContext(consistentBundlesToYangURLs);
109         if (yangStoreOpt.isPresent()) {
110             return yangStoreOpt.get();
111         }
112         SchemaContext snapshot = createSnapshot(parser, consistentBundlesToYangURLs);
113         updateCache(snapshot);
114         return snapshot;
115     }
116
117     @Override
118     public void addModule(Module module) {
119         // TODO Auto-generated method stub
120         throw new UnsupportedOperationException();
121     }
122
123     @Override
124     public SchemaContext getSessionContext() {
125         // TODO Auto-generated method stub
126         throw new UnsupportedOperationException();
127     }
128
129     @Override
130     public void removeModule(Module module) {
131         // TODO Auto-generated method stub
132         throw new UnsupportedOperationException();
133     }
134
135     @Override
136     public ListenerRegistration<SchemaServiceListener> registerSchemaServiceListener(SchemaServiceListener listener) {
137         return listeners.register(listener);
138     }
139
140     @Override
141     public void close() throws Exception {
142         bundleTracker.close();
143         // FIXME: Add listeners.close();
144
145     }
146
147     private synchronized boolean tryToUpdateState(Collection<URL> changedURLs, Multimap<Bundle, URL> proposedNewState,
148             boolean adding) {
149         Preconditions.checkArgument(changedURLs.size() > 0, "No change can occur when no URLs are changed");
150
151         try {
152             // consistent state
153             // merge into
154             SchemaContext snapshot = createSnapshot(parser, proposedNewState);
155             consistentBundlesToYangURLs.clear();
156             consistentBundlesToYangURLs.putAll(proposedNewState);
157             inconsistentBundlesToYangURLs.clear();
158             // update cache
159             updateCache(snapshot);
160             logger.info("SchemaService updated to new consistent state");
161             logger.trace("SchemaService  updated to new consistent state containing {}", consistentBundlesToYangURLs);
162
163             // notifyListeners(changedURLs, adding);
164             return true;
165         } catch (Exception e) {
166             // inconsistent state
167             logger.debug(
168                     "SchemaService is falling back on last consistent state containing {}, inconsistent yang files {}, reason {}",
169                     consistentBundlesToYangURLs, inconsistentBundlesToYangURLs, e.toString());
170             return false;
171         }
172     }
173
174     private static Collection<InputStream> fromUrlsToInputStreams(Multimap<Bundle, URL> multimap) {
175         return Collections2.transform(multimap.values(), new Function<URL, InputStream>() {
176
177             @Override
178             public InputStream apply(URL url) {
179                 try {
180                     return url.openStream();
181                 } catch (IOException e) {
182                     logger.warn("Unable to open stream from {}", url);
183                     throw new IllegalStateException("Unable to open stream from " + url, e);
184                 }
185             }
186         });
187     }
188
189     private static SchemaContext createSnapshot(YangModelParser parser, Multimap<Bundle, URL> multimap) {
190         List<InputStream> models = new ArrayList<>(fromUrlsToInputStreams(multimap));
191         Set<Module> modules = parser.parseYangModelsFromStreams(models);
192         SchemaContext yangStoreSnapshot = parser.resolveSchemaContext(modules);
193         return yangStoreSnapshot;
194     }
195
196     private void updateCache(SchemaContext snapshot) {
197         cache.cacheYangStore(consistentBundlesToYangURLs, snapshot);
198
199         Object[] services = listenerTracker.getServices();
200         if (services != null) {
201             for (Object rawListener : services) {
202                 SchemaServiceListener listener = (SchemaServiceListener) rawListener;
203                 try {
204                     listener.onGlobalContextUpdated(snapshot);
205                 } catch (Exception e) {
206                     logger.error("Exception occured during invoking listener", e);
207                 }
208             }
209         }
210         for (ListenerRegistration<SchemaServiceListener> listener : listeners) {
211             try {
212                 listener.getInstance().onGlobalContextUpdated(snapshot);
213             } catch (Exception e) {
214                 logger.error("Exception occured during invoking listener", e);
215             }
216         }
217     }
218
219     private class BundleScanner implements BundleTrackerCustomizer<Object> {
220         @Override
221         public Object addingBundle(Bundle bundle, BundleEvent event) {
222
223             // Ignore system bundle:
224             // system bundle might have config-api on classpath &&
225             // config-api contains yang files =>
226             // system bundle might contain yang files from that bundle
227             if (bundle.getBundleId() == 0)
228                 return bundle;
229
230             Enumeration<URL> enumeration = bundle.findEntries("META-INF/yang", "*.yang", false);
231             if (enumeration != null && enumeration.hasMoreElements()) {
232                 synchronized (this) {
233                     List<URL> addedURLs = new ArrayList<>();
234                     while (enumeration.hasMoreElements()) {
235                         URL url = enumeration.nextElement();
236                         addedURLs.add(url);
237                     }
238                     logger.trace("Bundle {} has event {}, bundle state {}, URLs {}", bundle, event, bundle.getState(),
239                             addedURLs);
240                     // test that yang store is consistent
241                     Multimap<Bundle, URL> proposedNewState = HashMultimap.create(consistentBundlesToYangURLs);
242                     proposedNewState.putAll(inconsistentBundlesToYangURLs);
243                     proposedNewState.putAll(bundle, addedURLs);
244                     boolean adding = true;
245
246                     if (tryToUpdateState(addedURLs, proposedNewState, adding) == false) {
247                         inconsistentBundlesToYangURLs.putAll(bundle, addedURLs);
248                     }
249                 }
250             }
251             return bundle;
252         }
253
254         @Override
255         public void modifiedBundle(Bundle bundle, BundleEvent event, Object object) {
256             logger.debug("Modified bundle {} {} {}", bundle, event, object);
257         }
258
259         /**
260          * If removing YANG files makes yang store inconsistent, method
261          * {@link #getYangStoreSnapshot()} will throw exception. There is no
262          * rollback.
263          */
264
265         @Override
266         public synchronized void removedBundle(Bundle bundle, BundleEvent event, Object object) {
267             inconsistentBundlesToYangURLs.removeAll(bundle);
268             Collection<URL> consistentURLsToBeRemoved = consistentBundlesToYangURLs.removeAll(bundle);
269
270             if (consistentURLsToBeRemoved.isEmpty()) {
271                 return; // no change
272             }
273             boolean adding = false;
274             // notifyListeners(consistentURLsToBeRemoved, adding);
275         }
276     }
277
278     private static final class YangStoreCache {
279
280         Set<URL> cachedUrls;
281         SchemaContext cachedContextSnapshot;
282
283         Optional<SchemaContext> getCachedSchemaContext(Multimap<Bundle, URL> bundlesToYangURLs) {
284             Set<URL> urls = setFromMultimapValues(bundlesToYangURLs);
285             if (cachedUrls != null && cachedUrls.equals(urls)) {
286                 Preconditions.checkState(cachedContextSnapshot != null);
287                 return Optional.of(cachedContextSnapshot);
288             }
289             return Optional.absent();
290         }
291
292         private static Set<URL> setFromMultimapValues(Multimap<Bundle, URL> bundlesToYangURLs) {
293             Set<URL> urls = Sets.newHashSet(bundlesToYangURLs.values());
294             Preconditions.checkState(bundlesToYangURLs.size() == urls.size());
295             return urls;
296         }
297
298         void cacheYangStore(Multimap<Bundle, URL> urls, SchemaContext ctx) {
299             this.cachedUrls = setFromMultimapValues(urls);
300             this.cachedContextSnapshot = ctx;
301         }
302     }
303
304     @Override
305     public SchemaServiceListener addingService(ServiceReference<SchemaServiceListener> reference) {
306
307         SchemaServiceListener listener = context.getService(reference);
308         SchemaContext _ctxContext = getGlobalContext();
309         if (getContext() != null) {
310             listener.onGlobalContextUpdated(_ctxContext);
311         }
312         return listener;
313     }
314
315     @Override
316     public void modifiedService(ServiceReference<SchemaServiceListener> reference, SchemaServiceListener service) {
317         // NOOP
318     }
319
320     @Override
321     public void removedService(ServiceReference<SchemaServiceListener> reference, SchemaServiceListener service) {
322         context.ungetService(reference);
323     }
324 }