Promote SchemaSourceRepresentation
[yangtools.git] / parser / yang-parser-impl / src / main / java / org / opendaylight / yangtools / yang / parser / repo / SharedEffectiveModelContextFactory.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2021 PANTHEON.tech, s.r.o.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.yangtools.yang.parser.repo;
10
11 import static com.google.common.base.Verify.verify;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.ImmutableSet;
17 import com.google.common.collect.Iterables;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24 import java.lang.invoke.MethodHandles;
25 import java.lang.invoke.VarHandle;
26 import java.lang.ref.Cleaner;
27 import java.lang.ref.Reference;
28 import java.lang.ref.SoftReference;
29 import java.lang.ref.WeakReference;
30 import java.util.Collection;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.function.Function;
36 import org.eclipse.jdt.annotation.NonNull;
37 import org.eclipse.jdt.annotation.Nullable;
38 import org.opendaylight.yangtools.yang.ir.YangIRSchemaSource;
39 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
40 import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
41 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
42 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactoryConfiguration;
43 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * An almost-simple cache. EffectiveModel computation is explicitly asynchronous and we are also threadless, i.e. we
49  * hijack repository threads to do our work.
50  */
51 final class SharedEffectiveModelContextFactory implements EffectiveModelContextFactory {
52     private static final class CacheEntry {
53         private static final Function<EffectiveModelContext, Reference<EffectiveModelContext>> REF;
54         private static final VarHandle STATE;
55
56         static {
57             try {
58                 STATE = MethodHandles.lookup().findVarHandle(CacheEntry.class, "state", Object.class);
59             } catch (NoSuchFieldException | IllegalAccessException e) {
60                 throw new ExceptionInInitializerError(e);
61             }
62
63             String prop = System.getProperty("org.opendaylight.yangtools.yang.parser.repo.shared-refs", "weak");
64             REF = switch (prop) {
65                 case "soft" -> SoftReference::new;
66                 case "weak" -> WeakReference::new;
67                 default -> {
68                     LOG.warn("Invalid shared-refs \"{}\", defaulting to weak references", prop);
69                     prop = "weak";
70                     yield WeakReference::new;
71                 }
72             };
73             LOG.info("Using {} references", prop);
74         }
75
76         // This field can be in one of two states:
77         // - SettableFuture, in which case the model is being computed
78         // - Reference, in which case the model is available through the reference (unless cleared)
79         @SuppressWarnings("unused")
80         @SuppressFBWarnings(value = "URF_UNREAD_FIELD",
81             justification = "https://github.com/spotbugs/spotbugs/issues/2749")
82         private volatile Object state = SettableFuture.create();
83
84         @SuppressWarnings("unchecked")
85         @Nullable ListenableFuture<EffectiveModelContext> future() {
86             final Object local = STATE.getAcquire(this);
87             if (local instanceof SettableFuture) {
88                 return (SettableFuture<EffectiveModelContext>) local;
89             }
90             verify(local instanceof Reference, "Unexpected state %s", local);
91             final EffectiveModelContext model = ((Reference<EffectiveModelContext>) local).get();
92             return model == null ? null : Futures.immediateFuture(model);
93         }
94
95         @SuppressWarnings("unchecked")
96         @NonNull SettableFuture<EffectiveModelContext> getFuture() {
97             final Object local = STATE.getAcquire(this);
98             verify(local instanceof SettableFuture, "Unexpected state %s", local);
99             return (SettableFuture<EffectiveModelContext>) local;
100         }
101
102         void resolve(final EffectiveModelContext context) {
103             final SettableFuture<EffectiveModelContext> future = getFuture();
104             // Publish a weak reference before triggering any listeners on the future so that newcomers can see it
105             final Object witness = STATE.compareAndExchangeRelease(this, future, REF.apply(context));
106             verify(witness == future, "Unexpected witness %s", witness);
107             future.set(context);
108         }
109     }
110
111
112     private static final Logger LOG = LoggerFactory.getLogger(SharedEffectiveModelContextFactory.class);
113     private static final Cleaner CLEANER = Cleaner.create();
114
115     private final ConcurrentMap<Set<SourceIdentifier>, CacheEntry> cache = new ConcurrentHashMap<>();
116     private final AssembleSources assembleSources;
117     private final SchemaRepository repository;
118
119     SharedEffectiveModelContextFactory(final @NonNull SharedSchemaRepository repository,
120             final @NonNull SchemaContextFactoryConfiguration config) {
121         this.repository = requireNonNull(repository);
122         assembleSources = new AssembleSources(repository.factory(), config);
123
124     }
125
126     @Override
127     public @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModelContext(
128             final @NonNull Collection<SourceIdentifier> requiredSources) {
129         return createEffectiveModel(dedupSources(requiredSources));
130     }
131
132     @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModel(final Set<SourceIdentifier> sources) {
133         final CacheEntry existing = cache.get(sources);
134         return existing != null ? acquireModel(sources, existing) : computeModel(sources);
135     }
136
137     // We may have an entry, but we do not know in what state it is in: it may be stable, it may be being built up
138     // or in process of being retired.
139     private @NonNull ListenableFuture<EffectiveModelContext> acquireModel(final Set<SourceIdentifier> sources,
140             final @NonNull CacheEntry entry) {
141         // Request a future from the entry, which indicates the context is either available or being constructed
142         final ListenableFuture<EffectiveModelContext> existing = entry.future();
143         if (existing != null) {
144             return existing;
145         }
146         // The entry cannot satisfy our request: remove it and fall back to computation
147         cache.remove(sources, entry);
148         return computeModel(sources);
149     }
150
151     private @NonNull ListenableFuture<EffectiveModelContext> computeModel(final Set<SourceIdentifier> sources) {
152         // Insert a new entry until we succeed or there is a workable entry
153         final CacheEntry ourEntry = new CacheEntry();
154         while (true) {
155             final CacheEntry prevEntry = cache.putIfAbsent(sources, ourEntry);
156             if (prevEntry == null) {
157                 // successful insert
158                 break;
159             }
160
161             // ... okay, we have raced, but is the entry still usable?
162             final ListenableFuture<EffectiveModelContext> existing = prevEntry.future();
163             if (existing != null) {
164                 // .. yup, we are done here
165                 return existing;
166             }
167
168             // ... no dice, remove the entry and retry
169             cache.remove(sources, prevEntry);
170         }
171
172         // Acquire the future first, then kick off computation. That way we do not need to worry about races around
173         // EffectiveModelContext being garbage-collected just after have computed it and before we have acquired a
174         // reference to it.
175         final ListenableFuture<EffectiveModelContext> result = ourEntry.getFuture();
176         resolveEntry(sources, ourEntry);
177         return result;
178     }
179
180     private void resolveEntry(final Set<SourceIdentifier> sources, final CacheEntry entry) {
181         LOG.debug("Starting assembly of {} sources", sources.size());
182         final Stopwatch sw = Stopwatch.createStarted();
183
184         // Request all sources be loaded
185         ListenableFuture<List<YangIRSchemaSource>> sf = Futures.allAsList(Collections2.transform(sources,
186             identifier -> repository.getSchemaSource(identifier, YangIRSchemaSource.class)));
187
188         // Detect mismatch between requested Source IDs and IDs that are extracted from parsed source
189         // Also remove duplicates if present
190         // We are relying on preserved order of uniqueSourceIdentifiers as well as sf
191         sf = Futures.transform(sf, new SourceIdMismatchDetector(sources), MoreExecutors.directExecutor());
192
193         // Assemble sources into a schema context
194         final ListenableFuture<EffectiveModelContext> cf = Futures.transformAsync(sf, assembleSources,
195             MoreExecutors.directExecutor());
196
197         // FIXME: we do not deal with invalidation here. We should monitor the repository for changes in source schemas
198         //        and react appropriately:
199         //        - in case we failed certainly want to invalidate the entry
200         //        - in case of success ... that's something to consider
201         Futures.addCallback(cf, new FutureCallback<>() {
202             @Override
203             public void onSuccess(final EffectiveModelContext result) {
204                 LOG.debug("Finished assembly of {} sources in {}", sources.size(), sw);
205
206                 // Remove the entry when the context is GC'd
207                 final Stopwatch residence = Stopwatch.createStarted();
208                 CLEANER.register(result, () -> {
209                     LOG.debug("Removing entry after {}", residence);
210                     cache.remove(sources, entry);
211                 });
212
213                 // Flip the entry to resolved
214                 entry.resolve(result);
215             }
216
217             @Override
218             public void onFailure(final Throwable cause) {
219                 LOG.debug("Failed assembly of {} in {}", sources, sw, cause);
220                 entry.getFuture().setException(cause);
221
222                 // remove failed result from the cache so it can be recomputed, as this might have been a transient
223                 // problem.
224                 cache.remove(sources, entry);
225             }
226         }, MoreExecutors.directExecutor());
227     }
228
229     /**
230      * Return a set of de-duplicated inputs.
231      *
232      * @return set (preserving ordering) from the input collection
233      */
234     private static ImmutableSet<SourceIdentifier> dedupSources(final Collection<SourceIdentifier> sources) {
235         final ImmutableSet<SourceIdentifier> result = ImmutableSet.copyOf(sources);
236         if (result.size() != sources.size()) {
237             LOG.warn("Duplicate sources requested for schema context, removed duplicate sources: {}",
238                 Collections2.filter(result, input -> Iterables.frequency(sources, input) > 1));
239         }
240         return result;
241     }
242 }