BUG-8618: make sure we refresh backend info 89/58989/2
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 14 Jun 2017 10:38:20 +0000 (12:38 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 15 Jun 2017 06:55:11 +0000 (06:55 +0000)
When we are performing a reconnection attempt we must never use
previous backend info, but rather have to refresh it.

Fix this by removing state when resolution fails.

Change-Id: I65592f2101547a606a15d9c8030c7d8c58afe8a5
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit a816f13940862759cefcd6e69330c6a70b512ee2)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleShardBackendResolver.java

index dde2292241ef55344e64bbd2793a03038c13bfa3..f0775ba289fcae321fce46132290e9f32faaa6a1 100644 (file)
@@ -69,21 +69,49 @@ final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
         return cookie;
     }
 
-    private ShardState resolveBackendInfo(final Long cookie) {
+
+    @Override
+    public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+        /*
+         * We cannot perform a simple computeIfAbsent() here because we need to control sequencing of when the state
+         * is inserted into the map and retired from it (based on the stage result).
+         *
+         * We do not want to hook another stage one processing completes and hooking a removal on failure from a compute
+         * method runs the inherent risk of stage completing before the insertion does (i.e. we have a removal of
+         * non-existent element.
+         */
+        final ShardState existing = backends.get(cookie);
+        if (existing != null) {
+            return existing.getStage();
+        }
+
         final String shardName = shards.inverse().get(cookie);
         if (shardName == null) {
             LOG.warn("Failing request for non-existent cookie {}", cookie);
-            return null;
+            throw new IllegalArgumentException("Cookie " + cookie + " does not have a shard assigned");
         }
 
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
+        final ShardState toInsert = resolveBackendInfo(shardName, cookie);
 
-        return resolveBackendInfo(shardName, cookie);
-    }
+        final ShardState raced = backends.putIfAbsent(cookie, toInsert);
+        if (raced != null) {
+            // We have had a concurrent insertion, return that
+            LOG.debug("Race during insertion of state for cookie {} shard {}", cookie, shardName);
+            return raced.getStage();
+        }
 
-    @Override
-    public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
-        return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage();
+        // We have succeeded in populating the map, now we need to take care of pruning the entry if it fails to
+        // complete
+        final CompletionStage<ShardBackendInfo> stage = toInsert.getStage();
+        stage.whenComplete((info, failure) -> {
+            if (failure != null) {
+                LOG.debug("Resolution of cookie {} shard {} failed, removing state", cookie, shardName, failure);
+                backends.remove(cookie, toInsert);
+            }
+        });
+
+        return stage;
     }
 
     @Override
index 056a1ea7e226deb49a6ee12e916769bf7e6dbb0c..370484d9556f03d8eb677c20f321d9b5fdcb2458 100644 (file)
@@ -41,18 +41,33 @@ final class SimpleShardBackendResolver extends AbstractShardBackendResolver {
     private CompletionStage<ShardBackendInfo> getBackendInfo(final long cookie) {
         Preconditions.checkArgument(cookie == 0);
 
-        ShardState local = state;
-        if (local == null) {
-            synchronized (this) {
-                local = state;
-                if (local == null) {
-                    local = resolveBackendInfo(shardName, 0);
-                    state = local;
-                }
-            }
+        final ShardState existing = state;
+        if (existing != null) {
+            return existing.getStage();
         }
 
-        return local.getStage();
+        synchronized (this) {
+            final ShardState recheck = state;
+            if (recheck != null) {
+                return recheck.getStage();
+            }
+
+            final ShardState newState = resolveBackendInfo(shardName, 0);
+            state = newState;
+
+            final CompletionStage<ShardBackendInfo> stage = newState.getStage();
+            stage.whenComplete((info, failure) -> {
+                if (failure != null) {
+                    synchronized (SimpleShardBackendResolver.this) {
+                        if (state == newState) {
+                            state = null;
+                        }
+                    }
+                }
+            });
+
+            return stage;
+        }
     }
 
     @Override