X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=f0775ba289fcae321fce46132290e9f32faaa6a1;hp=dde2292241ef55344e64bbd2793a03038c13bfa3;hb=refs%2Fchanges%2F89%2F58989%2F2;hpb=5f7ae4a949dc2fc78174e670697a78598678443c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index dde2292241..f0775ba289 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -69,21 +69,49 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver { return cookie; } - private ShardState resolveBackendInfo(final Long cookie) { + + @Override + public CompletionStage getBackendInfo(final Long cookie) { + /* + * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state + * is inserted into the map and retired from it (based on the stage result). + * + * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute + * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of + * non-existent element. + */ + final ShardState existing = backends.get(cookie); + if (existing != null) { + return existing.getStage(); + } + final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); - return null; + throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned"); } LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); + final ShardState toInsert = resolveBackendInfo(shardName, cookie); - return resolveBackendInfo(shardName, cookie); - } + final ShardState raced = backends.putIfAbsent(cookie, toInsert); + if (raced != null) { + // We have had a concurrent insertion, return that + LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName); + return raced.getStage(); + } - @Override - public CompletionStage getBackendInfo(final Long cookie) { - return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage(); + // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to + // complete + final CompletionStage stage = toInsert.getStage(); + stage.whenComplete((info, failure) -> { + if (failure != null) { + LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure); + backends.remove(cookie, toInsert); + } + }); + + return stage; } @Override