Refactor DataStore readiness tracking 94/91794/2
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 31 Jul 2020 09:34:22 +0000 (11:34 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 31 Jul 2020 10:56:18 +0000 (12:56 +0200)
Using a CountDownLatch is not composable, which leads to current
layout. Switch to using a SettableFuture, which can be accessed
via AbstractDataStore.initialSettleFuture().

This allows us to externalize the settle policy, letting callers
decide what to actually do.

JIRA: CONTROLLER-1882
Change-Id: Iaf9a359cfc2507ae35688fca3673c13713c2b427
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AbstractShardManagerCreator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java

index d6958a7..95cd0ad 100644 (file)
@@ -17,9 +17,11 @@ import akka.actor.Props;
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
@@ -60,7 +62,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
 
-    private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+    private final SettableFuture<Void> readinessFuture = SettableFuture.create();
     private final ClientIdentifier identifier;
     private final DataStoreClient client;
     private final ActorUtils actorUtils;
@@ -90,7 +92,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
 
         AbstractShardManagerCreator<?> creator = getShardManagerCreator().cluster(cluster).configuration(configuration)
                 .datastoreContextFactory(datastoreContextFactory)
-                .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
+                .readinessFuture(readinessFuture)
                 .primaryShardInfoCache(primaryShardInfoCache)
                 .restoreFromSnapshot(restoreFromSnapshot)
                 .distributedDataStore(this);
@@ -248,6 +250,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
     }
 
     // TODO: consider removing this in favor of awaitReadiness()
+    @Deprecated
     public void waitTillReady() {
         LOG.info("Beginning to wait for data store to become ready : {}", identifier);
 
@@ -266,23 +269,36 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
     }
 
     @Beta
+    @Deprecated
     public boolean awaitReadiness() throws InterruptedException {
         return awaitReadiness(initialSettleTime());
     }
 
     @Beta
+    @Deprecated
     public boolean awaitReadiness(final Duration toWait) throws InterruptedException {
-        if (toWait.isFinite()) {
-            return waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS);
+        try {
+            if (toWait.isFinite()) {
+                try {
+                    readinessFuture.get(toWait.toNanos(), TimeUnit.NANOSECONDS);
+                } catch (TimeoutException e) {
+                    LOG.debug("Timed out waiting for shards to settle", e);
+                    return false;
+                }
+            } else {
+                readinessFuture.get();
+            }
+        } catch (ExecutionException e) {
+            LOG.warn("Unexpected readiness failure, assuming convergence", e);
         }
 
-        waitTillReadyCountDownLatch.await();
         return true;
     }
 
     @Beta
+    @Deprecated
     public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
-        if (!waitTillReadyCountDownLatch.await(timeout, unit)) {
+        if (!awaitReadiness(Duration.create(timeout, unit))) {
             throw new TimeoutException("Shard leaders failed to settle");
         }
     }
@@ -307,9 +323,18 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         throw new IllegalStateException("Failed to create Shard Manager", lastException);
     }
 
+    /**
+     * Future which completes when all shards settle for the first time.
+     *
+     * @return A Listenable future.
+     */
+    public final ListenableFuture<?> initialSettleFuture() {
+        return readinessFuture;
+    }
+
     @VisibleForTesting
-    public CountDownLatch getWaitTillReadyCountDownLatch() {
-        return waitTillReadyCountDownLatch;
+    SettableFuture<Void> readinessFuture() {
+        return readinessFuture;
     }
 
     @Override
index 0285b26..d28734b 100644 (file)
@@ -11,7 +11,7 @@ import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.Props;
-import java.util.concurrent.CountDownLatch;
+import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
@@ -20,11 +20,11 @@ import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 
 public abstract class AbstractShardManagerCreator<T extends AbstractShardManagerCreator<T>> {
+    private SettableFuture<Void> readinessFuture;
     private ClusterWrapper cluster;
     private Configuration configuration;
     private DatastoreContextFactory datastoreContextFactory;
     private AbstractDataStore distributedDataStore;
-    private CountDownLatch waitTillReadyCountDownLatch;
     private PrimaryShardInfoFutureCache primaryShardInfoCache;
     private DatastoreSnapshot restoreFromSnapshot;
     private volatile boolean sealed;
@@ -82,13 +82,13 @@ public abstract class AbstractShardManagerCreator<T extends AbstractShardManager
         return self();
     }
 
-    CountDownLatch getWaitTillReadyCountDownLatch() {
-        return waitTillReadyCountDownLatch;
+    SettableFuture<Void> getReadinessFuture() {
+        return readinessFuture;
     }
 
-    public T waitTillReadyCountDownLatch(final CountDownLatch newWaitTillReadyCountDownLatch) {
+    public T readinessFuture(final SettableFuture<Void> newReadinessFuture) {
         checkSealed();
-        this.waitTillReadyCountDownLatch = newWaitTillReadyCountDownLatch;
+        this.readinessFuture = newReadinessFuture;
         return self();
     }
 
@@ -118,7 +118,7 @@ public abstract class AbstractShardManagerCreator<T extends AbstractShardManager
         requireNonNull(configuration, "configuration should not be null");
         requireNonNull(datastoreContextFactory, "datastoreContextFactory should not be null");
         requireNonNull(distributedDataStore, "distributedDataStore should not be null");
-        requireNonNull(waitTillReadyCountDownLatch, "waitTillReadyCountdownLatch should not be null");
+        requireNonNull(readinessFuture, "readinessFuture should not be null");
         requireNonNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
     }
 
index d1b9aba..52f642c 100644 (file)
@@ -35,6 +35,7 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.SettableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -156,7 +156,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreContextFactory datastoreContextFactory;
 
-    private final CountDownLatch waitTillReadyCountdownLatch;
+    private final SettableFuture<Void> readinessFuture;
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
@@ -187,7 +187,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
+        this.readinessFuture = builder.getReadinessFuture();
         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
@@ -761,10 +761,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void checkReady() {
         if (isReadyWithLeaderId()) {
-            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-            waitTillReadyCountdownLatch.countDown();
+            LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
+            readinessFuture.set(null);
         }
     }
 
index 283e686..9e05b7f 100644 (file)
@@ -14,11 +14,10 @@ import static org.mockito.MockitoAnnotations.initMocks;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import java.util.concurrent.CountDownLatch;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
-import org.mockito.Mock;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -42,8 +41,7 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
 
-    @Mock
-    protected static CountDownLatch ready;
+    protected static SettableFuture<Void> ready;
 
     protected TestShardManager.Builder newTestShardMgrBuilder() {
         return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
@@ -55,12 +53,13 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
     }
 
     protected Props newShardMgrProps(final Configuration config) {
-        return newTestShardMgrBuilder(config).waitTillReadyCountDownLatch(ready).props();
+        return newTestShardMgrBuilder(config).readinessFuture(ready).props();
     }
 
     @Before
     public void setUp() {
         initMocks(this);
+        ready = SettableFuture.create();
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
index 8ddf854..56bbbf5 100644 (file)
@@ -124,7 +124,7 @@ public class DistributedDataStoreTest extends AbstractActorTest {
 
             Executors.newSingleThreadExecutor().submit(() -> {
                 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-                distributedDataStore.getWaitTillReadyCountDownLatch().countDown();
+                distributedDataStore.readinessFuture().set(null);
             });
 
             long start = System.currentTimeMillis();
index 6418300..1be0fa8 100644 (file)
@@ -17,10 +17,8 @@ import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
@@ -1049,13 +1047,11 @@ public class ShardManagerTest extends AbstractShardManagerTest {
         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
         shardManager.handleCommand(new RoleChangeNotification(
                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
-
-        verify(ready, never()).countDown();
+        assertFalse(ready.isDone());
 
         shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
-
-        verify(ready, times(1)).countDown();
+        assertTrue(ready.isDone());
     }
 
     @Test
@@ -1065,16 +1061,14 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
-
-        verify(ready, never()).countDown();
+        assertFalse(ready.isDone());
 
         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
 
         shardManager.handleCommand(
             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
-
-        verify(ready, times(1)).countDown();
+        assertTrue(ready.isDone());
     }
 
     @Test
@@ -1084,16 +1078,14 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
         shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
-
-        verify(ready, never()).countDown();
+        assertFalse(ready.isDone());
 
         shardManager.handleCommand(
             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
 
         shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
-
-        verify(ready, times(1)).countDown();
+        assertTrue(ready.isDone());
     }
 
     @Test
@@ -1102,8 +1094,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
             RaftState.Leader.name()));
-
-        verify(ready, never()).countDown();
+        assertFalse(ready.isDone());
     }
 
     @Test
@@ -2267,7 +2258,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
 
         AbstractGenericCreator(final Class<C> shardManagerClass) {
             this.shardManagerClass = shardManagerClass;
-            cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
+            cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
         }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.