Refactor SharedSchemaContextFactory
[yangtools.git] / yang / yang-parser-impl / src / main / java / org / opendaylight / yangtools / yang / parser / repo / SharedEffectiveModelContextFactory.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2021 PANTHEON.tech, s.r.o.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.yangtools.yang.parser.repo;
10
11 import static com.google.common.base.Verify.verify;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.ImmutableSet;
17 import com.google.common.collect.Iterables;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.lang.invoke.MethodHandles;
24 import java.lang.invoke.VarHandle;
25 import java.lang.ref.Cleaner;
26 import java.lang.ref.Reference;
27 import java.lang.ref.SoftReference;
28 import java.lang.ref.WeakReference;
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentMap;
34 import java.util.function.Function;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
38 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
39 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactoryConfiguration;
40 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
41 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
42 import org.opendaylight.yangtools.yang.parser.rfc7950.ir.IRSchemaSource;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * An almost-simple cache. EffectiveModel computation is explicitly asynchronous and we are also threadless, i.e. we
48  * hijack repository threads to do our work.
49  */
50 final class SharedEffectiveModelContextFactory implements EffectiveModelContextFactory {
51     private static final class CacheEntry {
52         private static final Function<EffectiveModelContext, Reference<EffectiveModelContext>> REF;
53         private static final VarHandle STATE;
54
55         static {
56             try {
57                 STATE = MethodHandles.lookup().findVarHandle(CacheEntry.class, "state", Object.class);
58             } catch (NoSuchFieldException | IllegalAccessException e) {
59                 throw new ExceptionInInitializerError(e);
60             }
61
62             String prop = System.getProperty("org.opendaylight.yangtools.yang.parser.repo.shared-refs", "weak");
63             switch (prop) {
64                 case "soft":
65                     REF = SoftReference::new;
66                     break;
67                 case "weak":
68                     REF = WeakReference::new;
69                     break;
70                 default:
71                     LOG.warn("Invalid shared-refs \"{}\", defaulting to weak references", prop);
72                     prop = "weak";
73                     REF = WeakReference::new;
74             }
75             LOG.info("Using {} references", prop);
76         }
77
78         // This field can be in one of two states:
79         // - SettableFuture, in which case the model is being computed
80         // - Reference, in which case the model is available through the reference (unless cleared)
81         @SuppressWarnings("unused")
82         private volatile Object state = SettableFuture.create();
83
84         @SuppressWarnings("unchecked")
85         @Nullable ListenableFuture<EffectiveModelContext> future() {
86             final Object local = STATE.getAcquire(this);
87             if (local instanceof SettableFuture) {
88                 return (SettableFuture<EffectiveModelContext>) local;
89             }
90             verify(local instanceof Reference, "Unexpected state %s", local);
91             final EffectiveModelContext model = ((Reference<EffectiveModelContext>) local).get();
92             return model == null ? null : Futures.immediateFuture(model);
93         }
94
95         @SuppressWarnings("unchecked")
96         @NonNull SettableFuture<EffectiveModelContext> getFuture() {
97             final Object local = STATE.getAcquire(this);
98             verify(local instanceof SettableFuture, "Unexpected state %s", local);
99             return (SettableFuture<EffectiveModelContext>) local;
100         }
101
102         void resolve(final EffectiveModelContext context) {
103             final SettableFuture<EffectiveModelContext> future = getFuture();
104             // Publish a weak reference before triggering any listeners on the future so that newcomers can see it
105             final Object witness = STATE.compareAndExchangeRelease(this, future, REF.apply(context));
106             verify(witness == future, "Unexpected witness %s", witness);
107             future.set(context);
108         }
109     }
110
111
112     private static final Logger LOG = LoggerFactory.getLogger(SharedEffectiveModelContextFactory.class);
113     private static final Cleaner CLEANER = Cleaner.create();
114
115     private final ConcurrentMap<Set<SourceIdentifier>, CacheEntry> cache = new ConcurrentHashMap<>();
116     private final AssembleSources assembleSources;
117     private final SchemaRepository repository;
118
119     SharedEffectiveModelContextFactory(final @NonNull SharedSchemaRepository repository,
120             final @NonNull SchemaContextFactoryConfiguration config) {
121         this.repository = requireNonNull(repository);
122         this.assembleSources = new AssembleSources(repository.factory(), config);
123
124     }
125
126     @Override
127     public @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModelContext(
128             final @NonNull Collection<SourceIdentifier> requiredSources) {
129         return createEffectiveModel(dedupSources(requiredSources));
130     }
131
132     @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModel(final Set<SourceIdentifier> sources) {
133         final CacheEntry existing = cache.get(sources);
134         return existing != null ? acquireModel(sources, existing) : computeModel(sources);
135     }
136
137     // We may have an entry, but we do not know in what state it is in: it may be stable, it may be being built up
138     // or in process of being retired.
139     private @NonNull ListenableFuture<EffectiveModelContext> acquireModel(final Set<SourceIdentifier> sources,
140             final @NonNull CacheEntry entry) {
141         // Request a future from the entry, which indicates the context is either available or being constructed
142         final ListenableFuture<EffectiveModelContext> existing = entry.future();
143         if (existing != null) {
144             return existing;
145         }
146         // The entry cannot satisfy our request: remove it and fall back to computation
147         cache.remove(sources, entry);
148         return computeModel(sources);
149     }
150
151     private @NonNull ListenableFuture<EffectiveModelContext> computeModel(final Set<SourceIdentifier> sources) {
152         // Insert a new entry until we succeed or there is a workable entry
153         final CacheEntry ourEntry = new CacheEntry();
154         while (true) {
155             final CacheEntry prevEntry = cache.putIfAbsent(sources, ourEntry);
156             if (prevEntry == null) {
157                 // successful insert
158                 break;
159             }
160
161             // ... okay, we have raced, but is the entry still usable?
162             final ListenableFuture<EffectiveModelContext> existing = prevEntry.future();
163             if (existing != null) {
164                 // .. yup, we are done here
165                 return existing;
166             }
167
168             // ... no dice, remove the entry and retry
169             cache.remove(sources, prevEntry);
170         }
171
172         // Acquire the future first, then kick off computation. That way we do not need to worry about races around
173         // EffectiveModelContext being garbage-collected just after have computed it and before we have acquired a
174         // reference to it.
175         final ListenableFuture<EffectiveModelContext> result = ourEntry.getFuture();
176         resolveEntry(sources, ourEntry);
177         return result;
178     }
179
180     private void resolveEntry(final Set<SourceIdentifier> sources, final CacheEntry entry) {
181         LOG.debug("Starting assembly of {} sources", sources.size());
182         final Stopwatch sw = Stopwatch.createStarted();
183
184         // Request all sources be loaded
185         ListenableFuture<List<IRSchemaSource>> sf = Futures.allAsList(Collections2.transform(sources,
186             identifier -> repository.getSchemaSource(identifier, IRSchemaSource.class)));
187
188         // Detect mismatch between requested Source IDs and IDs that are extracted from parsed source
189         // Also remove duplicates if present
190         // We are relying on preserved order of uniqueSourceIdentifiers as well as sf
191         sf = Futures.transform(sf, new SourceIdMismatchDetector(sources), MoreExecutors.directExecutor());
192
193         // Assemble sources into a schema context
194         final ListenableFuture<EffectiveModelContext> cf = Futures.transformAsync(sf, assembleSources,
195             MoreExecutors.directExecutor());
196
197         // FIXME: we do not deal with invalidation here. We should monitor the repository for changes in source schemas
198         //        and react appropriately:
199         //        - in case we failed certainly want to invalidate the entry
200         //        - in case of success ... that's something to consider
201         Futures.addCallback(cf, new FutureCallback<EffectiveModelContext>() {
202             @Override
203             public void onSuccess(final EffectiveModelContext result) {
204                 LOG.debug("Finished assembly of {} sources in {}", sources.size(), sw);
205
206                 // Remove the entry when the context is GC'd
207                 final Stopwatch residence = Stopwatch.createStarted();
208                 CLEANER.register(result, () -> {
209                     LOG.debug("Removing entry after {}", residence);
210                     cache.remove(sources, entry);
211                 });
212
213                 // Flip the entry to resolved
214                 entry.resolve(result);
215             }
216
217             @Override
218             public void onFailure(final Throwable cause) {
219                 LOG.debug("Failed assembly of {} in {}", sources, sw, cause);
220                 entry.getFuture().setException(cause);
221             }
222         }, MoreExecutors.directExecutor());
223     }
224
225     /**
226      * Return a set of de-duplicated inputs.
227      *
228      * @return set (preserving ordering) from the input collection
229      */
230     private static ImmutableSet<SourceIdentifier> dedupSources(final Collection<SourceIdentifier> sources) {
231         final ImmutableSet<SourceIdentifier> result = ImmutableSet.copyOf(sources);
232         if (result.size() != sources.size()) {
233             LOG.warn("Duplicate sources requested for schema context, removed duplicate sources: {}",
234                 Collections2.filter(result, input -> Iterables.frequency(sources, input) > 1));
235         }
236         return result;
237     }
238 }