Merge "Queue yang model"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / SchemaServiceImpl.java
1 package org.opendaylight.controller.sal.dom.broker;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.net.URL;
6 import java.util.ArrayList;
7 import java.util.Collection;
8 import java.util.Enumeration;
9 import java.util.List;
10 import java.util.Set;
11 import java.util.zip.Checksum;
12
13 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
14 import org.osgi.util.tracker.BundleTracker;
15 import org.osgi.util.tracker.BundleTrackerCustomizer;
16 import org.osgi.util.tracker.ServiceTracker;
17 import org.osgi.util.tracker.ServiceTrackerCustomizer;
18 import org.opendaylight.yangtools.yang.model.api.Module;
19 import org.opendaylight.yangtools.yang.model.parser.api.YangModelParser;
20 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
21 import org.osgi.framework.Bundle;
22 import org.osgi.framework.BundleActivator;
23 import org.osgi.framework.BundleContext;
24 import org.osgi.framework.BundleEvent;
25 import org.osgi.framework.ServiceReference;
26 import org.opendaylight.yangtools.concepts.ListenerRegistration;
27 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
28 import org.opendaylight.controller.sal.core.api.model.SchemaService;
29 import org.opendaylight.controller.sal.core.api.model.SchemaServiceListener;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import com.google.common.base.Function;
34 import com.google.common.base.Optional;
35 import com.google.common.base.Preconditions;
36 import com.google.common.collect.Collections2;
37 import com.google.common.collect.HashMultimap;
38 import com.google.common.collect.Multimap;
39 import com.google.common.collect.Sets;
40
41 import static com.google.common.base.Preconditions.*;
42
43 public class SchemaServiceImpl implements //
44 SchemaService, //
45 ServiceTrackerCustomizer<SchemaServiceListener, SchemaServiceListener>, //
46 AutoCloseable {
47     private static final Logger logger = LoggerFactory.getLogger(SchemaServiceImpl.class);
48
49     private ListenerRegistry<SchemaServiceListener> listeners;
50     private YangModelParser parser;
51
52     private BundleContext context;
53     private BundleScanner scanner = new BundleScanner();
54
55     /**
56      * Map of currently problematic yang files that should get fixed eventually
57      * after all events are received.
58      */
59     private final Multimap<Bundle, URL> inconsistentBundlesToYangURLs = HashMultimap.create();
60     private final Multimap<Bundle, URL> consistentBundlesToYangURLs = HashMultimap.create();
61     private BundleTracker<Object> bundleTracker;
62     private final YangStoreCache cache = new YangStoreCache();
63
64     private ServiceTracker<SchemaServiceListener,SchemaServiceListener> listenerTracker;
65
66     public ListenerRegistry<SchemaServiceListener> getListeners() {
67         return listeners;
68     }
69
70     public void setListeners(ListenerRegistry<SchemaServiceListener> listeners) {
71         this.listeners = listeners;
72     }
73
74     public YangModelParser getParser() {
75         return parser;
76     }
77
78     public void setParser(YangModelParser parser) {
79         this.parser = parser;
80     }
81
82     public BundleContext getContext() {
83         return context;
84     }
85
86     public void setContext(BundleContext context) {
87         this.context = context;
88     }
89
90     public void start() {
91         checkState(parser != null);
92         checkState(context != null);
93         if (listeners == null) {
94             listeners = new ListenerRegistry<>();
95         }
96         
97         listenerTracker = new ServiceTracker<>(context, SchemaServiceListener.class, this);
98         bundleTracker = new BundleTracker<Object>(context, BundleEvent.RESOLVED | BundleEvent.UNRESOLVED, scanner);
99         bundleTracker.open();
100         listenerTracker.open();
101     }
102
103     public SchemaContext getGlobalContext() {
104         return getSchemaContextSnapshot();
105     }
106
107     public synchronized SchemaContext getSchemaContextSnapshot() {
108         Optional<SchemaContext> yangStoreOpt = cache.getCachedSchemaContext(consistentBundlesToYangURLs);
109         if (yangStoreOpt.isPresent()) {
110             return yangStoreOpt.get();
111         }
112         SchemaContext snapshot = createSnapshot(parser, consistentBundlesToYangURLs);
113         updateCache(snapshot);
114         return snapshot;
115     }
116
117     @Override
118     public void addModule(Module module) {
119         // TODO Auto-generated method stub
120         throw new UnsupportedOperationException();
121     }
122
123     @Override
124     public SchemaContext getSessionContext() {
125         // TODO Auto-generated method stub
126         throw new UnsupportedOperationException();
127     }
128
129     @Override
130     public void removeModule(Module module) {
131         // TODO Auto-generated method stub
132         throw new UnsupportedOperationException();
133     }
134
135     
136     @Override
137     public ListenerRegistration<SchemaServiceListener> registerSchemaServiceListener(SchemaServiceListener listener) {
138         return listeners.register(listener);
139     }
140     
141     @Override
142     public void close() throws Exception {
143         bundleTracker.close();
144         // FIXME: Add listeners.close();
145
146     }
147
148     private synchronized boolean tryToUpdateState(Collection<URL> changedURLs, Multimap<Bundle, URL> proposedNewState,
149             boolean adding) {
150         Preconditions.checkArgument(changedURLs.size() > 0, "No change can occur when no URLs are changed");
151
152         try {
153             // consistent state
154             // merge into
155             SchemaContext snapshot = createSnapshot(parser, proposedNewState);
156             consistentBundlesToYangURLs.clear();
157             consistentBundlesToYangURLs.putAll(proposedNewState);
158             inconsistentBundlesToYangURLs.clear();
159             // update cache
160             updateCache(snapshot);
161             logger.info("SchemaService updated to new consistent state");
162             logger.trace("SchemaService  updated to new consistent state containing {}", consistentBundlesToYangURLs);
163
164             // notifyListeners(changedURLs, adding);
165             return true;
166         } catch (Exception e) {
167             // inconsistent state
168             logger.debug(
169                     "SchemaService is falling back on last consistent state containing {}, inconsistent yang files {}, reason {}",
170                     consistentBundlesToYangURLs, inconsistentBundlesToYangURLs, e.toString());
171             return false;
172         }
173     }
174
175     private static Collection<InputStream> fromUrlsToInputStreams(Multimap<Bundle, URL> multimap) {
176         return Collections2.transform(multimap.values(), new Function<URL, InputStream>() {
177
178             @Override
179             public InputStream apply(URL url) {
180                 try {
181                     return url.openStream();
182                 } catch (IOException e) {
183                     logger.warn("Unable to open stream from {}", url);
184                     throw new IllegalStateException("Unable to open stream from " + url, e);
185                 }
186             }
187         });
188     }
189
190     private static SchemaContext createSnapshot(YangModelParser parser, Multimap<Bundle, URL> multimap) {
191         List<InputStream> models = new ArrayList<>(fromUrlsToInputStreams(multimap));
192         Set<Module> modules = parser.parseYangModelsFromStreams(models);
193         SchemaContext yangStoreSnapshot = parser.resolveSchemaContext(modules);
194         return yangStoreSnapshot;
195     }
196
197     private void updateCache(SchemaContext snapshot) {
198         cache.cacheYangStore(consistentBundlesToYangURLs, snapshot);
199         
200         Object[] services = listenerTracker.getServices();
201         if(services != null) {
202             for(Object rawListener : services) {
203                 SchemaServiceListener listener = (SchemaServiceListener) rawListener;
204                 try {
205                     listener.onGlobalContextUpdated(snapshot);
206                 } catch (Exception e) {
207                     logger.error("Exception occured during invoking listener",e);
208                 }
209             }
210         }
211         for (ListenerRegistration<SchemaServiceListener> listener : listeners) {
212             try {
213                 listener.getInstance().onGlobalContextUpdated(snapshot);
214             } catch (Exception e) {
215                 logger.error("Exception occured during invoking listener",e);
216             }
217         }
218     }
219
220     private class BundleScanner implements BundleTrackerCustomizer<Object> {
221         @Override
222         public Object addingBundle(Bundle bundle, BundleEvent event) {
223
224             // Ignore system bundle:
225             // system bundle might have config-api on classpath &&
226             // config-api contains yang files =>
227             // system bundle might contain yang files from that bundle
228             if (bundle.getBundleId() == 0)
229                 return bundle;
230
231             Enumeration<URL> enumeration = bundle.findEntries("META-INF/yang", "*.yang", false);
232             if (enumeration != null && enumeration.hasMoreElements()) {
233                 synchronized (this) {
234                     List<URL> addedURLs = new ArrayList<>();
235                     while (enumeration.hasMoreElements()) {
236                         URL url = enumeration.nextElement();
237                         addedURLs.add(url);
238                     }
239                     logger.trace("Bundle {} has event {}, bundle state {}, URLs {}", bundle, event, bundle.getState(),
240                             addedURLs);
241                     // test that yang store is consistent
242                     Multimap<Bundle, URL> proposedNewState = HashMultimap.create(consistentBundlesToYangURLs);
243                     proposedNewState.putAll(inconsistentBundlesToYangURLs);
244                     proposedNewState.putAll(bundle, addedURLs);
245                     boolean adding = true;
246                     
247                     if (tryToUpdateState(addedURLs, proposedNewState, adding) == false) {
248                         inconsistentBundlesToYangURLs.putAll(bundle, addedURLs);
249                     }
250                 }
251             }
252             return bundle;
253         }
254
255         @Override
256         public void modifiedBundle(Bundle bundle, BundleEvent event, Object object) {
257             logger.debug("Modified bundle {} {} {}", bundle, event, object);
258         }
259
260         /**
261          * If removing YANG files makes yang store inconsistent, method
262          * {@link #getYangStoreSnapshot()} will throw exception. There is no
263          * rollback.
264          */
265
266         @Override
267         public synchronized void removedBundle(Bundle bundle, BundleEvent event, Object object) {
268             inconsistentBundlesToYangURLs.removeAll(bundle);
269             Collection<URL> consistentURLsToBeRemoved = consistentBundlesToYangURLs.removeAll(bundle);
270
271             if (consistentURLsToBeRemoved.isEmpty()) {
272                 return; // no change
273             }
274             boolean adding = false;
275             // notifyListeners(consistentURLsToBeRemoved, adding);
276         }
277     }
278
279     private static final class YangStoreCache {
280
281         Set<URL> cachedUrls;
282         SchemaContext cachedContextSnapshot;
283
284         Optional<SchemaContext> getCachedSchemaContext(Multimap<Bundle, URL> bundlesToYangURLs) {
285             Set<URL> urls = setFromMultimapValues(bundlesToYangURLs);
286             if (cachedUrls != null && cachedUrls.equals(urls)) {
287                 Preconditions.checkState(cachedContextSnapshot != null);
288                 return Optional.of(cachedContextSnapshot);
289             }
290             return Optional.absent();
291         }
292
293         private static Set<URL> setFromMultimapValues(Multimap<Bundle, URL> bundlesToYangURLs) {
294             Set<URL> urls = Sets.newHashSet(bundlesToYangURLs.values());
295             Preconditions.checkState(bundlesToYangURLs.size() == urls.size());
296             return urls;
297         }
298
299         void cacheYangStore(Multimap<Bundle, URL> urls, SchemaContext ctx) {
300             this.cachedUrls = setFromMultimapValues(urls);
301             this.cachedContextSnapshot = ctx;
302         }
303     }
304     
305     @Override
306     public SchemaServiceListener addingService(ServiceReference<SchemaServiceListener> reference) {
307         
308         SchemaServiceListener listener = context.getService(reference);
309         SchemaContext _ctxContext = getGlobalContext();
310         if(getContext() != null) {
311             listener.onGlobalContextUpdated(_ctxContext);
312         }
313         return listener;
314     }
315     
316     @Override
317     public void modifiedService(ServiceReference<SchemaServiceListener> reference, SchemaServiceListener service) {
318         // NOOP
319     }
320     
321     @Override
322     public void removedService(ServiceReference<SchemaServiceListener> reference, SchemaServiceListener service) {
323         context.ungetService(reference);
324     }
325 }