072a2dabf2b2eebd6eb9b1153446c4c77839668c
[yangtools.git] / parser / yang-parser-impl / src / main / java / org / opendaylight / yangtools / yang / parser / repo / SharedEffectiveModelContextFactory.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2021 PANTHEON.tech, s.r.o.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.yangtools.yang.parser.repo;
10
11 import static com.google.common.base.Verify.verify;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.ImmutableSet;
17 import com.google.common.collect.Iterables;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.lang.invoke.MethodHandles;
24 import java.lang.invoke.VarHandle;
25 import java.lang.ref.Cleaner;
26 import java.lang.ref.Reference;
27 import java.lang.ref.SoftReference;
28 import java.lang.ref.WeakReference;
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentMap;
34 import java.util.function.Function;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
38 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
39 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactoryConfiguration;
40 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
41 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
42 import org.opendaylight.yangtools.yang.model.repo.api.YangIRSchemaSource;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * An almost-simple cache. EffectiveModel computation is explicitly asynchronous and we are also threadless, i.e. we
48  * hijack repository threads to do our work.
49  */
50 final class SharedEffectiveModelContextFactory implements EffectiveModelContextFactory {
51     private static final class CacheEntry {
52         private static final Function<EffectiveModelContext, Reference<EffectiveModelContext>> REF;
53         private static final VarHandle STATE;
54
55         static {
56             try {
57                 STATE = MethodHandles.lookup().findVarHandle(CacheEntry.class, "state", Object.class);
58             } catch (NoSuchFieldException | IllegalAccessException e) {
59                 throw new ExceptionInInitializerError(e);
60             }
61
62             String prop = System.getProperty("org.opendaylight.yangtools.yang.parser.repo.shared-refs", "weak");
63             REF = switch (prop) {
64                 case "soft" -> SoftReference::new;
65                 case "weak" -> WeakReference::new;
66                 default -> {
67                     LOG.warn("Invalid shared-refs \"{}\", defaulting to weak references", prop);
68                     prop = "weak";
69                     yield WeakReference::new;
70                 }
71             };
72             LOG.info("Using {} references", prop);
73         }
74
75         // This field can be in one of two states:
76         // - SettableFuture, in which case the model is being computed
77         // - Reference, in which case the model is available through the reference (unless cleared)
78         @SuppressWarnings("unused")
79         private volatile Object state = SettableFuture.create();
80
81         @SuppressWarnings("unchecked")
82         @Nullable ListenableFuture<EffectiveModelContext> future() {
83             final Object local = STATE.getAcquire(this);
84             if (local instanceof SettableFuture) {
85                 return (SettableFuture<EffectiveModelContext>) local;
86             }
87             verify(local instanceof Reference, "Unexpected state %s", local);
88             final EffectiveModelContext model = ((Reference<EffectiveModelContext>) local).get();
89             return model == null ? null : Futures.immediateFuture(model);
90         }
91
92         @SuppressWarnings("unchecked")
93         @NonNull SettableFuture<EffectiveModelContext> getFuture() {
94             final Object local = STATE.getAcquire(this);
95             verify(local instanceof SettableFuture, "Unexpected state %s", local);
96             return (SettableFuture<EffectiveModelContext>) local;
97         }
98
99         void resolve(final EffectiveModelContext context) {
100             final SettableFuture<EffectiveModelContext> future = getFuture();
101             // Publish a weak reference before triggering any listeners on the future so that newcomers can see it
102             final Object witness = STATE.compareAndExchangeRelease(this, future, REF.apply(context));
103             verify(witness == future, "Unexpected witness %s", witness);
104             future.set(context);
105         }
106     }
107
108
109     private static final Logger LOG = LoggerFactory.getLogger(SharedEffectiveModelContextFactory.class);
110     private static final Cleaner CLEANER = Cleaner.create();
111
112     private final ConcurrentMap<Set<SourceIdentifier>, CacheEntry> cache = new ConcurrentHashMap<>();
113     private final AssembleSources assembleSources;
114     private final SchemaRepository repository;
115
116     SharedEffectiveModelContextFactory(final @NonNull SharedSchemaRepository repository,
117             final @NonNull SchemaContextFactoryConfiguration config) {
118         this.repository = requireNonNull(repository);
119         assembleSources = new AssembleSources(repository.factory(), config);
120
121     }
122
123     @Override
124     public @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModelContext(
125             final @NonNull Collection<SourceIdentifier> requiredSources) {
126         return createEffectiveModel(dedupSources(requiredSources));
127     }
128
129     @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModel(final Set<SourceIdentifier> sources) {
130         final CacheEntry existing = cache.get(sources);
131         return existing != null ? acquireModel(sources, existing) : computeModel(sources);
132     }
133
134     // We may have an entry, but we do not know in what state it is in: it may be stable, it may be being built up
135     // or in process of being retired.
136     private @NonNull ListenableFuture<EffectiveModelContext> acquireModel(final Set<SourceIdentifier> sources,
137             final @NonNull CacheEntry entry) {
138         // Request a future from the entry, which indicates the context is either available or being constructed
139         final ListenableFuture<EffectiveModelContext> existing = entry.future();
140         if (existing != null) {
141             return existing;
142         }
143         // The entry cannot satisfy our request: remove it and fall back to computation
144         cache.remove(sources, entry);
145         return computeModel(sources);
146     }
147
148     private @NonNull ListenableFuture<EffectiveModelContext> computeModel(final Set<SourceIdentifier> sources) {
149         // Insert a new entry until we succeed or there is a workable entry
150         final CacheEntry ourEntry = new CacheEntry();
151         while (true) {
152             final CacheEntry prevEntry = cache.putIfAbsent(sources, ourEntry);
153             if (prevEntry == null) {
154                 // successful insert
155                 break;
156             }
157
158             // ... okay, we have raced, but is the entry still usable?
159             final ListenableFuture<EffectiveModelContext> existing = prevEntry.future();
160             if (existing != null) {
161                 // .. yup, we are done here
162                 return existing;
163             }
164
165             // ... no dice, remove the entry and retry
166             cache.remove(sources, prevEntry);
167         }
168
169         // Acquire the future first, then kick off computation. That way we do not need to worry about races around
170         // EffectiveModelContext being garbage-collected just after have computed it and before we have acquired a
171         // reference to it.
172         final ListenableFuture<EffectiveModelContext> result = ourEntry.getFuture();
173         resolveEntry(sources, ourEntry);
174         return result;
175     }
176
177     private void resolveEntry(final Set<SourceIdentifier> sources, final CacheEntry entry) {
178         LOG.debug("Starting assembly of {} sources", sources.size());
179         final Stopwatch sw = Stopwatch.createStarted();
180
181         // Request all sources be loaded
182         ListenableFuture<List<YangIRSchemaSource>> sf = Futures.allAsList(Collections2.transform(sources,
183             identifier -> repository.getSchemaSource(identifier, YangIRSchemaSource.class)));
184
185         // Detect mismatch between requested Source IDs and IDs that are extracted from parsed source
186         // Also remove duplicates if present
187         // We are relying on preserved order of uniqueSourceIdentifiers as well as sf
188         sf = Futures.transform(sf, new SourceIdMismatchDetector(sources), MoreExecutors.directExecutor());
189
190         // Assemble sources into a schema context
191         final ListenableFuture<EffectiveModelContext> cf = Futures.transformAsync(sf, assembleSources,
192             MoreExecutors.directExecutor());
193
194         // FIXME: we do not deal with invalidation here. We should monitor the repository for changes in source schemas
195         //        and react appropriately:
196         //        - in case we failed certainly want to invalidate the entry
197         //        - in case of success ... that's something to consider
198         Futures.addCallback(cf, new FutureCallback<>() {
199             @Override
200             public void onSuccess(final EffectiveModelContext result) {
201                 LOG.debug("Finished assembly of {} sources in {}", sources.size(), sw);
202
203                 // Remove the entry when the context is GC'd
204                 final Stopwatch residence = Stopwatch.createStarted();
205                 CLEANER.register(result, () -> {
206                     LOG.debug("Removing entry after {}", residence);
207                     cache.remove(sources, entry);
208                 });
209
210                 // Flip the entry to resolved
211                 entry.resolve(result);
212             }
213
214             @Override
215             public void onFailure(final Throwable cause) {
216                 LOG.debug("Failed assembly of {} in {}", sources, sw, cause);
217                 entry.getFuture().setException(cause);
218
219                 // remove failed result from the cache so it can be recomputed, as this might have been a transient
220                 // problem.
221                 cache.remove(sources, entry);
222             }
223         }, MoreExecutors.directExecutor());
224     }
225
226     /**
227      * Return a set of de-duplicated inputs.
228      *
229      * @return set (preserving ordering) from the input collection
230      */
231     private static ImmutableSet<SourceIdentifier> dedupSources(final Collection<SourceIdentifier> sources) {
232         final ImmutableSet<SourceIdentifier> result = ImmutableSet.copyOf(sources);
233         if (result.size() != sources.size()) {
234             LOG.warn("Duplicate sources requested for schema context, removed duplicate sources: {}",
235                 Collections2.filter(result, input -> Iterables.frequency(sources, input) > 1));
236         }
237         return result;
238     }
239 }