BUG-8618: make sure we refresh backend info
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolver.java
index dde2292241ef55344e64bbd2793a03038c13bfa3..f0775ba289fcae321fce46132290e9f32faaa6a1 100644 (file)
@@ -69,21 +69,49 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
         return cookie;
     }
 
-    private ShardState resolveBackendInfo(final Long cookie) {
+
+    @Override
+    public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+        /*
+         * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state
+         * is inserted into the map and retired from it (based on the stage result).
+         *
+         * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute
+         * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of
+         * non-existent element.
+         */
+        final ShardState existing = backends.get(cookie);
+        if (existing != null) {
+            return existing.getStage();
+        }
+
         final String shardName = shards.inverse().get(cookie);
         if (shardName == null) {
             LOG.warn("Failing request for non-existent cookie {}", cookie);
-            return null;
+            throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned");
         }
 
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
+        final ShardState toInsert = resolveBackendInfo(shardName, cookie);
 
-        return resolveBackendInfo(shardName, cookie);
-    }
+        final ShardState raced = backends.putIfAbsent(cookie, toInsert);
+        if (raced != null) {
+            // We have had a concurrent insertion, return that
+            LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName);
+            return raced.getStage();
+        }
 
-    @Override
-    public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
-        return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage();
+        // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to
+        // complete
+        final CompletionStage<ShardBackendInfo> stage = toInsert.getStage();
+        stage.whenComplete((info, failure) -> {
+            if (failure != null) {
+                LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure);
+                backends.remove(cookie, toInsert);
+            }
+        });
+
+        return stage;
     }
 
     @Override