2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2021 PANTHEON.tech, s.r.o.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.yang.parser.repo;
11 import static com.google.common.base.Verify.verify;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.ImmutableSet;
17 import com.google.common.collect.Iterables;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.lang.invoke.MethodHandles;
24 import java.lang.invoke.VarHandle;
25 import java.lang.ref.Cleaner;
26 import java.lang.ref.Reference;
27 import java.lang.ref.SoftReference;
28 import java.lang.ref.WeakReference;
29 import java.util.Collection;
30 import java.util.List;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentMap;
34 import java.util.function.Function;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
38 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
39 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactoryConfiguration;
40 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
41 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
42 import org.opendaylight.yangtools.yang.model.repo.api.YangIRSchemaSource;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * An almost-simple cache. EffectiveModel computation is explicitly asynchronous and we are also threadless, i.e. we
48 * hijack repository threads to do our work.
50 final class SharedEffectiveModelContextFactory implements EffectiveModelContextFactory {
51 private static final class CacheEntry {
52 private static final Function<EffectiveModelContext, Reference<EffectiveModelContext>> REF;
53 private static final VarHandle STATE;
57 STATE = MethodHandles.lookup().findVarHandle(CacheEntry.class, "state", Object.class);
58 } catch (NoSuchFieldException | IllegalAccessException e) {
59 throw new ExceptionInInitializerError(e);
62 String prop = System.getProperty("org.opendaylight.yangtools.yang.parser.repo.shared-refs", "weak");
64 case "soft" -> SoftReference::new;
65 case "weak" -> WeakReference::new;
67 LOG.warn("Invalid shared-refs \"{}\", defaulting to weak references", prop);
69 yield WeakReference::new;
72 LOG.info("Using {} references", prop);
75 // This field can be in one of two states:
76 // - SettableFuture, in which case the model is being computed
77 // - Reference, in which case the model is available through the reference (unless cleared)
78 @SuppressWarnings("unused")
79 private volatile Object state = SettableFuture.create();
81 @SuppressWarnings("unchecked")
82 @Nullable ListenableFuture<EffectiveModelContext> future() {
83 final Object local = STATE.getAcquire(this);
84 if (local instanceof SettableFuture) {
85 return (SettableFuture<EffectiveModelContext>) local;
87 verify(local instanceof Reference, "Unexpected state %s", local);
88 final EffectiveModelContext model = ((Reference<EffectiveModelContext>) local).get();
89 return model == null ? null : Futures.immediateFuture(model);
92 @SuppressWarnings("unchecked")
93 @NonNull SettableFuture<EffectiveModelContext> getFuture() {
94 final Object local = STATE.getAcquire(this);
95 verify(local instanceof SettableFuture, "Unexpected state %s", local);
96 return (SettableFuture<EffectiveModelContext>) local;
99 void resolve(final EffectiveModelContext context) {
100 final SettableFuture<EffectiveModelContext> future = getFuture();
101 // Publish a weak reference before triggering any listeners on the future so that newcomers can see it
102 final Object witness = STATE.compareAndExchangeRelease(this, future, REF.apply(context));
103 verify(witness == future, "Unexpected witness %s", witness);
109 private static final Logger LOG = LoggerFactory.getLogger(SharedEffectiveModelContextFactory.class);
110 private static final Cleaner CLEANER = Cleaner.create();
112 private final ConcurrentMap<Set<SourceIdentifier>, CacheEntry> cache = new ConcurrentHashMap<>();
113 private final AssembleSources assembleSources;
114 private final SchemaRepository repository;
116 SharedEffectiveModelContextFactory(final @NonNull SharedSchemaRepository repository,
117 final @NonNull SchemaContextFactoryConfiguration config) {
118 this.repository = requireNonNull(repository);
119 assembleSources = new AssembleSources(repository.factory(), config);
124 public @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModelContext(
125 final @NonNull Collection<SourceIdentifier> requiredSources) {
126 return createEffectiveModel(dedupSources(requiredSources));
129 @NonNull ListenableFuture<EffectiveModelContext> createEffectiveModel(final Set<SourceIdentifier> sources) {
130 final CacheEntry existing = cache.get(sources);
131 return existing != null ? acquireModel(sources, existing) : computeModel(sources);
134 // We may have an entry, but we do not know in what state it is in: it may be stable, it may be being built up
135 // or in process of being retired.
136 private @NonNull ListenableFuture<EffectiveModelContext> acquireModel(final Set<SourceIdentifier> sources,
137 final @NonNull CacheEntry entry) {
138 // Request a future from the entry, which indicates the context is either available or being constructed
139 final ListenableFuture<EffectiveModelContext> existing = entry.future();
140 if (existing != null) {
143 // The entry cannot satisfy our request: remove it and fall back to computation
144 cache.remove(sources, entry);
145 return computeModel(sources);
148 private @NonNull ListenableFuture<EffectiveModelContext> computeModel(final Set<SourceIdentifier> sources) {
149 // Insert a new entry until we succeed or there is a workable entry
150 final CacheEntry ourEntry = new CacheEntry();
152 final CacheEntry prevEntry = cache.putIfAbsent(sources, ourEntry);
153 if (prevEntry == null) {
158 // ... okay, we have raced, but is the entry still usable?
159 final ListenableFuture<EffectiveModelContext> existing = prevEntry.future();
160 if (existing != null) {
161 // .. yup, we are done here
165 // ... no dice, remove the entry and retry
166 cache.remove(sources, prevEntry);
169 // Acquire the future first, then kick off computation. That way we do not need to worry about races around
170 // EffectiveModelContext being garbage-collected just after have computed it and before we have acquired a
172 final ListenableFuture<EffectiveModelContext> result = ourEntry.getFuture();
173 resolveEntry(sources, ourEntry);
177 private void resolveEntry(final Set<SourceIdentifier> sources, final CacheEntry entry) {
178 LOG.debug("Starting assembly of {} sources", sources.size());
179 final Stopwatch sw = Stopwatch.createStarted();
181 // Request all sources be loaded
182 ListenableFuture<List<YangIRSchemaSource>> sf = Futures.allAsList(Collections2.transform(sources,
183 identifier -> repository.getSchemaSource(identifier, YangIRSchemaSource.class)));
185 // Detect mismatch between requested Source IDs and IDs that are extracted from parsed source
186 // Also remove duplicates if present
187 // We are relying on preserved order of uniqueSourceIdentifiers as well as sf
188 sf = Futures.transform(sf, new SourceIdMismatchDetector(sources), MoreExecutors.directExecutor());
190 // Assemble sources into a schema context
191 final ListenableFuture<EffectiveModelContext> cf = Futures.transformAsync(sf, assembleSources,
192 MoreExecutors.directExecutor());
194 // FIXME: we do not deal with invalidation here. We should monitor the repository for changes in source schemas
195 // and react appropriately:
196 // - in case we failed certainly want to invalidate the entry
197 // - in case of success ... that's something to consider
198 Futures.addCallback(cf, new FutureCallback<EffectiveModelContext>() {
200 public void onSuccess(final EffectiveModelContext result) {
201 LOG.debug("Finished assembly of {} sources in {}", sources.size(), sw);
203 // Remove the entry when the context is GC'd
204 final Stopwatch residence = Stopwatch.createStarted();
205 CLEANER.register(result, () -> {
206 LOG.debug("Removing entry after {}", residence);
207 cache.remove(sources, entry);
210 // Flip the entry to resolved
211 entry.resolve(result);
215 public void onFailure(final Throwable cause) {
216 LOG.debug("Failed assembly of {} in {}", sources, sw, cause);
217 entry.getFuture().setException(cause);
219 // remove failed result from the cache so it can be recomputed, as this might have been a transient
221 cache.remove(sources, entry);
223 }, MoreExecutors.directExecutor());
227 * Return a set of de-duplicated inputs.
229 * @return set (preserving ordering) from the input collection
231 private static ImmutableSet<SourceIdentifier> dedupSources(final Collection<SourceIdentifier> sources) {
232 final ImmutableSet<SourceIdentifier> result = ImmutableSet.copyOf(sources);
233 if (result.size() != sources.size()) {
234 LOG.warn("Duplicate sources requested for schema context, removed duplicate sources: {}",
235 Collections2.filter(result, input -> Iterables.frequency(sources, input) > 1));