import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.antlr.v4.runtime.ParserRuleContext;
import org.opendaylight.yangtools.antlrv4.code.gen.YangStatementParser.StatementContext;
final ListenableFuture<SchemaContext> cf = Futures.transformAsync(sf, assembleSources,
MoreExecutors.directExecutor());
- // Populate cache when successful
+ final SettableFuture<SchemaContext> rf = SettableFuture.create();
Futures.addCallback(cf, new FutureCallback<SchemaContext>() {
@Override
public void onSuccess(final SchemaContext result) {
- cache.put(uniqueSourceIdentifiers, result);
+ // Deduplicate concurrent loads
+ final SchemaContext existing;
+ try {
+ existing = cache.get(uniqueSourceIdentifiers, () -> result);
+ } catch (ExecutionException e) {
+ LOG.warn("Failed to recheck result with cache, will use computed value", e);
+ rf.set(result);
+ return;
+ }
+
+ rf.set(existing);
}
@Override
public void onFailure(@Nonnull final Throwable cause) {
LOG.debug("Failed to assemble sources", cause);
+ rf.setException(cause);
}
}, MoreExecutors.directExecutor());
- return cf;
+ return rf;
}
private ListenableFuture<ASTSchemaSource> requestSource(final SourceIdentifier identifier) {