2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2021 PANTHEON.tech, s.r.o.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.yang.parser.repo;
11 import static com.google.common.base.Verify.verify;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.ImmutableSet;
17 import com.google.common.collect.Iterables;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.lang.invoke.MethodHandles;
24 import java.lang.invoke.VarHandle;
25 import java.lang.ref.Cleaner;
26 import java.lang.ref.Reference;
27 import java.lang.ref.SoftReference;
28 import java.lang.ref.WeakReference;
29 import java.util.Collection;
30 import java.util.List;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentMap;
34 import java.util.function.Function;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
38 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
39 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactoryConfiguration;
40 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
41 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
42 import org.opendaylight.yangtools.yang.parser.rfc7950.ir.IRSchemaSource;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * An almost-simple cache. EffectiveModel computation is explicitly asynchronous and we are also threadless, i.e. we
48 * hijack repository threads to do our work.
50 final class SharedEffectiveModelContextFactory implements EffectiveModelContextFactory {
51 private static final class CacheEntry {
52 private static final Function<EffectiveModelContext, Reference<EffectiveModelContext>> REF;
53 private static final VarHandle STATE;
57 STATE = MethodHandles.lookup().findVarHandle(CacheEntry.class, "state", Object.class);
58 } catch (NoSuchFieldException | IllegalAccessException e) {
59 throw new ExceptionInInitializerError(e);
62 String prop = System.getProperty("org.opendaylight.yangtools.yang.parser.repo.shared-refs", "weak");
65 REF = SoftReference::new;
68 REF = WeakReference::new;
71 LOG.warn("Invalid shared-refs \"{}\", defaulting to weak references", prop);
73 REF = WeakReference::new;
75 LOG.info("Using {} references", prop);
78 // This field can be in one of two states:
79 // - SettableFuture, in which case the model is being computed
80 // - Reference, in which case the model is available through the reference (unless cleared)
81 @SuppressWarnings("unused")
82 private volatile Object state = SettableFuture.create();
84 @SuppressWarnings("unchecked")
85 @Nullable ListenableFuture<EffectiveModelContext> future() {
86 final Object local = STATE.getAcquire(this);
87 if (local instanceof SettableFuture) {
88 return (SettableFuture<EffectiveModelContext>) local;
90 verify(local instanceof Reference, "Unexpected state %s", local);
91 final EffectiveModelContext model = ((Reference<EffectiveModelContext>) local).get();
92 return model == null ? null : Futures.immediateFuture(model);
95 @SuppressWarnings("unchecked")
96 @NonNull SettableFuture<EffectiveModelContext> getFuture() {
97 final Object local = STATE.getAcquire(this);
98 verify(local instanceof SettableFuture, "Unexpected state %s", local);
99 return (SettableFuture<EffectiveModelContext>) local;
102 void resolve(final EffectiveModelContext context) {
103 final SettableFuture<EffectiveModelContext> future = getFuture();
104 // Publish a weak reference before triggering any listeners on the future so that newcomers can see it
105 final Object witness = STATE.compareAndExchangeRelease(this, future, REF.apply(context));
106 verify(witness == future, "Unexpected witness %s", witness);
112 private static final Logger LOG = LoggerFactory.getLogger(SharedEffectiveModelContextFactory.class);
113 private static final Cleaner CLEANER = Cleaner.create();
115 private final ConcurrentMap<Set<SourceIdentifier>, CacheEntry> cache = new ConcurrentHashMap<>();
116 private final AssembleSources assembleSources;
117 private final SchemaRepository repository;
119 SharedEffectiveModelContextFactory(final @NonNull SharedSchemaRepository repository,
120 final @NonNull SchemaContextFactoryConfiguration config) {
121 this.repository = requireNonNull(repository);
122 this.assembleSources = new AssembleSources(repository.factory(), config);
127 public @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModelContext(
128 final @NonNull Collection<SourceIdentifier> requiredSources) {
129 return createEffectiveModel(dedupSources(requiredSources));
132 @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModel(final Set<SourceIdentifier> sources) {
133 final CacheEntry existing = cache.get(sources);
134 return existing != null ? acquireModel(sources, existing) : computeModel(sources);
137 // We may have an entry, but we do not know in what state it is in: it may be stable, it may be being built up
138 // or in process of being retired.
139 private @NonNull ListenableFuture<EffectiveModelContext> acquireModel(final Set<SourceIdentifier> sources,
140 final @NonNull CacheEntry entry) {
141 // Request a future from the entry, which indicates the context is either available or being constructed
142 final ListenableFuture<EffectiveModelContext> existing = entry.future();
143 if (existing != null) {
146 // The entry cannot satisfy our request: remove it and fall back to computation
147 cache.remove(sources, entry);
148 return computeModel(sources);
151 private @NonNull ListenableFuture<EffectiveModelContext> computeModel(final Set<SourceIdentifier> sources) {
152 // Insert a new entry until we succeed or there is a workable entry
153 final CacheEntry ourEntry = new CacheEntry();
155 final CacheEntry prevEntry = cache.putIfAbsent(sources, ourEntry);
156 if (prevEntry == null) {
161 // ... okay, we have raced, but is the entry still usable?
162 final ListenableFuture<EffectiveModelContext> existing = prevEntry.future();
163 if (existing != null) {
164 // .. yup, we are done here
168 // ... no dice, remove the entry and retry
169 cache.remove(sources, prevEntry);
172 // Acquire the future first, then kick off computation. That way we do not need to worry about races around
173 // EffectiveModelContext being garbage-collected just after have computed it and before we have acquired a
175 final ListenableFuture<EffectiveModelContext> result = ourEntry.getFuture();
176 resolveEntry(sources, ourEntry);
180 private void resolveEntry(final Set<SourceIdentifier> sources, final CacheEntry entry) {
181 LOG.debug("Starting assembly of {} sources", sources.size());
182 final Stopwatch sw = Stopwatch.createStarted();
184 // Request all sources be loaded
185 ListenableFuture<List<IRSchemaSource>> sf = Futures.allAsList(Collections2.transform(sources,
186 identifier -> repository.getSchemaSource(identifier, IRSchemaSource.class)));
188 // Detect mismatch between requested Source IDs and IDs that are extracted from parsed source
189 // Also remove duplicates if present
190 // We are relying on preserved order of uniqueSourceIdentifiers as well as sf
191 sf = Futures.transform(sf, new SourceIdMismatchDetector(sources), MoreExecutors.directExecutor());
193 // Assemble sources into a schema context
194 final ListenableFuture<EffectiveModelContext> cf = Futures.transformAsync(sf, assembleSources,
195 MoreExecutors.directExecutor());
197 // FIXME: we do not deal with invalidation here. We should monitor the repository for changes in source schemas
198 // and react appropriately:
199 // - in case we failed certainly want to invalidate the entry
200 // - in case of success ... that's something to consider
201 Futures.addCallback(cf, new FutureCallback<EffectiveModelContext>() {
203 public void onSuccess(final EffectiveModelContext result) {
204 LOG.debug("Finished assembly of {} sources in {}", sources.size(), sw);
206 // Remove the entry when the context is GC'd
207 final Stopwatch residence = Stopwatch.createStarted();
208 CLEANER.register(result, () -> {
209 LOG.debug("Removing entry after {}", residence);
210 cache.remove(sources, entry);
213 // Flip the entry to resolved
214 entry.resolve(result);
218 public void onFailure(final Throwable cause) {
219 LOG.debug("Failed assembly of {} in {}", sources, sw, cause);
220 entry.getFuture().setException(cause);
222 }, MoreExecutors.directExecutor());
226 * Return a set of de-duplicated inputs.
228 * @return set (preserving ordering) from the input collection
230 private static ImmutableSet<SourceIdentifier> dedupSources(final Collection<SourceIdentifier> sources) {
231 final ImmutableSet<SourceIdentifier> result = ImmutableSet.copyOf(sources);
232 if (result.size() != sources.size()) {
233 LOG.warn("Duplicate sources requested for schema context, removed duplicate sources: {}",
234 Collections2.filter(result, input -> Iterables.frequency(sources, input) > 1));