summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
2d4e6da)
Using a CountDownLatch is not composable, which leads to current
layout. Switch to using a SettableFuture, which can be accessed
via AbstractDataStore.initialSettleFuture().
This allows us to externalize the settle policy, letting callers
decide what to actually do.
JIRA: CONTROLLER-1882
Change-Id: Iaf9a359cfc2507ae35688fca3673c13713c2b427
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Set;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
- private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+ private final SettableFuture<Void> readinessFuture = SettableFuture.create();
private final ClientIdentifier identifier;
private final DataStoreClient client;
private final ActorUtils actorUtils;
private final ClientIdentifier identifier;
private final DataStoreClient client;
private final ActorUtils actorUtils;
AbstractShardManagerCreator<?> creator = getShardManagerCreator().cluster(cluster).configuration(configuration)
.datastoreContextFactory(datastoreContextFactory)
AbstractShardManagerCreator<?> creator = getShardManagerCreator().cluster(cluster).configuration(configuration)
.datastoreContextFactory(datastoreContextFactory)
- .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
+ .readinessFuture(readinessFuture)
.primaryShardInfoCache(primaryShardInfoCache)
.restoreFromSnapshot(restoreFromSnapshot)
.distributedDataStore(this);
.primaryShardInfoCache(primaryShardInfoCache)
.restoreFromSnapshot(restoreFromSnapshot)
.distributedDataStore(this);
}
// TODO: consider removing this in favor of awaitReadiness()
}
// TODO: consider removing this in favor of awaitReadiness()
public void waitTillReady() {
LOG.info("Beginning to wait for data store to become ready : {}", identifier);
public void waitTillReady() {
LOG.info("Beginning to wait for data store to become ready : {}", identifier);
public boolean awaitReadiness() throws InterruptedException {
return awaitReadiness(initialSettleTime());
}
@Beta
public boolean awaitReadiness() throws InterruptedException {
return awaitReadiness(initialSettleTime());
}
@Beta
public boolean awaitReadiness(final Duration toWait) throws InterruptedException {
public boolean awaitReadiness(final Duration toWait) throws InterruptedException {
- if (toWait.isFinite()) {
- return waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS);
+ try {
+ if (toWait.isFinite()) {
+ try {
+ readinessFuture.get(toWait.toNanos(), TimeUnit.NANOSECONDS);
+ } catch (TimeoutException e) {
+ LOG.debug("Timed out waiting for shards to settle", e);
+ return false;
+ }
+ } else {
+ readinessFuture.get();
+ }
+ } catch (ExecutionException e) {
+ LOG.warn("Unexpected readiness failure, assuming convergence", e);
- waitTillReadyCountDownLatch.await();
public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
- if (!waitTillReadyCountDownLatch.await(timeout, unit)) {
+ if (!awaitReadiness(Duration.create(timeout, unit))) {
throw new TimeoutException("Shard leaders failed to settle");
}
}
throw new TimeoutException("Shard leaders failed to settle");
}
}
throw new IllegalStateException("Failed to create Shard Manager", lastException);
}
throw new IllegalStateException("Failed to create Shard Manager", lastException);
}
+ /**
+ * Future which completes when all shards settle for the first time.
+ *
+ * @return A Listenable future.
+ */
+ public final ListenableFuture<?> initialSettleFuture() {
+ return readinessFuture;
+ }
+
- public CountDownLatch getWaitTillReadyCountDownLatch() {
- return waitTillReadyCountDownLatch;
+ SettableFuture<Void> readinessFuture() {
+ return readinessFuture;
import static java.util.Objects.requireNonNull;
import akka.actor.Props;
import static java.util.Objects.requireNonNull;
import akka.actor.Props;
-import java.util.concurrent.CountDownLatch;
+import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
public abstract class AbstractShardManagerCreator<T extends AbstractShardManagerCreator<T>> {
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
public abstract class AbstractShardManagerCreator<T extends AbstractShardManagerCreator<T>> {
+ private SettableFuture<Void> readinessFuture;
private ClusterWrapper cluster;
private Configuration configuration;
private DatastoreContextFactory datastoreContextFactory;
private AbstractDataStore distributedDataStore;
private ClusterWrapper cluster;
private Configuration configuration;
private DatastoreContextFactory datastoreContextFactory;
private AbstractDataStore distributedDataStore;
- private CountDownLatch waitTillReadyCountDownLatch;
private PrimaryShardInfoFutureCache primaryShardInfoCache;
private DatastoreSnapshot restoreFromSnapshot;
private volatile boolean sealed;
private PrimaryShardInfoFutureCache primaryShardInfoCache;
private DatastoreSnapshot restoreFromSnapshot;
private volatile boolean sealed;
- CountDownLatch getWaitTillReadyCountDownLatch() {
- return waitTillReadyCountDownLatch;
+ SettableFuture<Void> getReadinessFuture() {
+ return readinessFuture;
- public T waitTillReadyCountDownLatch(final CountDownLatch newWaitTillReadyCountDownLatch) {
+ public T readinessFuture(final SettableFuture<Void> newReadinessFuture) {
- this.waitTillReadyCountDownLatch = newWaitTillReadyCountDownLatch;
+ this.readinessFuture = newReadinessFuture;
requireNonNull(configuration, "configuration should not be null");
requireNonNull(datastoreContextFactory, "datastoreContextFactory should not be null");
requireNonNull(distributedDataStore, "distributedDataStore should not be null");
requireNonNull(configuration, "configuration should not be null");
requireNonNull(datastoreContextFactory, "datastoreContextFactory should not be null");
requireNonNull(distributedDataStore, "distributedDataStore should not be null");
- requireNonNull(waitTillReadyCountDownLatch, "waitTillReadyCountdownLatch should not be null");
+ requireNonNull(readinessFuture, "readinessFuture should not be null");
requireNonNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
}
requireNonNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
}
import akka.persistence.SnapshotSelectionCriteria;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import akka.persistence.SnapshotSelectionCriteria;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
private DatastoreContextFactory datastoreContextFactory;
private DatastoreContextFactory datastoreContextFactory;
- private final CountDownLatch waitTillReadyCountdownLatch;
+ private final SettableFuture<Void> readinessFuture;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
+ this.readinessFuture = builder.getReadinessFuture();
this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
private void checkReady() {
if (isReadyWithLeaderId()) {
private void checkReady() {
if (isReadyWithLeaderId()) {
- LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
- persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
- waitTillReadyCountdownLatch.countDown();
+ LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
+ readinessFuture.set(null);
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
-import java.util.concurrent.CountDownLatch;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
-import org.mockito.Mock;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
.dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
.dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
- @Mock
- protected static CountDownLatch ready;
+ protected static SettableFuture<Void> ready;
protected TestShardManager.Builder newTestShardMgrBuilder() {
return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
protected TestShardManager.Builder newTestShardMgrBuilder() {
return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
}
protected Props newShardMgrProps(final Configuration config) {
}
protected Props newShardMgrProps(final Configuration config) {
- return newTestShardMgrBuilder(config).waitTillReadyCountDownLatch(ready).props();
+ return newTestShardMgrBuilder(config).readinessFuture(ready).props();
}
@Before
public void setUp() {
initMocks(this);
}
@Before
public void setUp() {
initMocks(this);
+ ready = SettableFuture.create();
InMemoryJournal.clear();
InMemorySnapshotStore.clear();
InMemoryJournal.clear();
InMemorySnapshotStore.clear();
Executors.newSingleThreadExecutor().submit(() -> {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
Executors.newSingleThreadExecutor().submit(() -> {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- distributedDataStore.getWaitTillReadyCountDownLatch().countDown();
+ distributedDataStore.readinessFuture().set(null);
});
long start = System.currentTimeMillis();
});
long start = System.currentTimeMillis();
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.handleCommand(new RoleChangeNotification(
memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.handleCommand(new RoleChangeNotification(
memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
-
- verify(ready, never()).countDown();
+ assertFalse(ready.isDone());
shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
-
- verify(ready, times(1)).countDown();
+ assertTrue(ready.isDone());
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
-
- verify(ready, never()).countDown();
+ assertFalse(ready.isDone());
shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
-
- verify(ready, times(1)).countDown();
+ assertTrue(ready.isDone());
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
-
- verify(ready, never()).countDown();
+ assertFalse(ready.isDone());
shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
-
- verify(ready, times(1)).countDown();
+ assertTrue(ready.isDone());
shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
RaftState.Leader.name()));
shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
RaftState.Leader.name()));
-
- verify(ready, never()).countDown();
+ assertFalse(ready.isDone());
AbstractGenericCreator(final Class<C> shardManagerClass) {
this.shardManagerClass = shardManagerClass;
AbstractGenericCreator(final Class<C> shardManagerClass) {
this.shardManagerClass = shardManagerClass;
- cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
+ cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
.primaryShardInfoCache(new PrimaryShardInfoFutureCache());
}
.primaryShardInfoCache(new PrimaryShardInfoFutureCache());
}