return cookie;
}
- private ShardState resolveBackendInfo(final Long cookie) {
+
+ @Override
+ public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+ /*
+ * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state
+ * is inserted into the map and retired from it (based on the stage result).
+ *
+ * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute
+ * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of
+ * non-existent element.
+ */
+ final ShardState existing = backends.get(cookie);
+ if (existing != null) {
+ return existing.getStage();
+ }
+
final String shardName = shards.inverse().get(cookie);
if (shardName == null) {
LOG.warn("Failing request for non-existent cookie {}", cookie);
- return null;
+ throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned");
}
LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
+ final ShardState toInsert = resolveBackendInfo(shardName, cookie);
- return resolveBackendInfo(shardName, cookie);
- }
+ final ShardState raced = backends.putIfAbsent(cookie, toInsert);
+ if (raced != null) {
+ // We have had a concurrent insertion, return that
+ LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName);
+ return raced.getStage();
+ }
- @Override
- public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
- return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage();
+ // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to
+ // complete
+ final CompletionStage<ShardBackendInfo> stage = toInsert.getStage();
+ stage.whenComplete((info, failure) -> {
+ if (failure != null) {
+ LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure);
+ backends.remove(cookie, toInsert);
+ }
+ });
+
+ return stage;
}
@Override
private CompletionStage<ShardBackendInfo> getBackendInfo(final long cookie) {
Preconditions.checkArgument(cookie == 0);
- ShardState local = state;
- if (local == null) {
- synchronized (this) {
- local = state;
- if (local == null) {
- local = resolveBackendInfo(shardName, 0);
- state = local;
- }
- }
+ final ShardState existing = state;
+ if (existing != null) {
+ return existing.getStage();
}
- return local.getStage();
+ synchronized (this) {
+ final ShardState recheck = state;
+ if (recheck != null) {
+ return recheck.getStage();
+ }
+
+ final ShardState newState = resolveBackendInfo(shardName, 0);
+ state = newState;
+
+ final CompletionStage<ShardBackendInfo> stage = newState.getStage();
+ stage.whenComplete((info, failure) -> {
+ if (failure != null) {
+ synchronized (SimpleShardBackendResolver.this) {
+ if (state == newState) {
+ state = null;
+ }
+ }
+ }
+ });
+
+ return stage;
+ }
}
@Override