In case we are running concurrent SchemaContext computation, if
the result is not cached we will have multiple results. These
need to be reconciled with the cache, so that those results will
be squashed into a single instance.
JIRA: YANGTOOLS-1004
Change-Id: I82df2b32a534d0acc803548579fb8ad138853c68
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import org.antlr.v4.runtime.ParserRuleContext;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yangtools.antlrv4.code.gen.YangStatementParser.StatementContext;
import org.antlr.v4.runtime.ParserRuleContext;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yangtools.antlrv4.code.gen.YangStatementParser.StatementContext;
final ListenableFuture<EffectiveModelContext> cf = Futures.transformAsync(sf, assembleSources,
MoreExecutors.directExecutor());
final ListenableFuture<EffectiveModelContext> cf = Futures.transformAsync(sf, assembleSources,
MoreExecutors.directExecutor());
- // Populate cache when successful
+ final SettableFuture<EffectiveModelContext> rf = SettableFuture.create();
Futures.addCallback(cf, new FutureCallback<EffectiveModelContext>() {
@Override
public void onSuccess(final EffectiveModelContext result) {
Futures.addCallback(cf, new FutureCallback<EffectiveModelContext>() {
@Override
public void onSuccess(final EffectiveModelContext result) {
- cache.put(uniqueSourceIdentifiers, result);
+ // Deduplicate concurrent loads
+ final EffectiveModelContext existing;
+ try {
+ existing = cache.get(uniqueSourceIdentifiers, () -> result);
+ } catch (ExecutionException e) {
+ LOG.warn("Failed to recheck result with cache, will use computed value", e);
+ rf.set(result);
+ return;
+ }
+
+ rf.set(existing);
}
@Override
public void onFailure(final Throwable cause) {
LOG.debug("Failed to assemble sources", cause);
}
@Override
public void onFailure(final Throwable cause) {
LOG.debug("Failed to assemble sources", cause);
+ rf.setException(cause);
}
}, MoreExecutors.directExecutor());
}
}, MoreExecutors.directExecutor());
}
private ListenableFuture<ASTSchemaSource> requestSource(final @NonNull SourceIdentifier identifier) {
}
private ListenableFuture<ASTSchemaSource> requestSource(final @NonNull SourceIdentifier identifier) {