When we are starting the datastore, we wait up to a fixed number of
election timeout intervals (3) for shards to finish electing a leader.
This is not always enough, especially if recovery takes a long time,
hence introduce shardLeaderElectionTimeoutMultiplier to make this
tunable.
JIRA: CONTROLLER-1914
Change-Id: Iba1d116d0248fc6046aeeae3ec30ecac50f373c9
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
# the stack trace of the creator of the Tx when there is an exception when the transaction is submitted
# (e.g. for a failed validation). Defaults to false due to performance impact.
#transaction-debug-context-enabled=true
# the stack trace of the creator of the Tx when there is an exception when the transaction is submitted
# (e.g. for a failed validation). Defaults to false due to performance impact.
#transaction-debug-context-enabled=true
+
+# Multiplicator of shard-leader-election-timeout-in-seconds for the purposes of initial datastore
+# convergence. Each frontend datastore instance will wait specified amount of time before becoming
+# exposed as a service. A value of 0 indicates waiting forever. Defaults to 3.
+initial-settle-timeout-multiplier=3
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
/**
* Base implementation of a distributed DOMStore.
/**
* Base implementation of a distributed DOMStore.
private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
- private static final long READY_WAIT_FACTOR = 3;
-
+ private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+ private final ClientIdentifier identifier;
+ private final DataStoreClient client;
private final ActorUtils actorUtils;
private final ActorUtils actorUtils;
- private final long waitTillReadyTimeInMillis;
private AutoCloseable closeable;
private AutoCloseable closeable;
private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
- private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
-
- private final ClientIdentifier identifier;
- private final DataStoreClient client;
-
@SuppressWarnings("checkstyle:IllegalCatch")
protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
@SuppressWarnings("checkstyle:IllegalCatch")
protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
identifier = client.getIdentifier();
LOG.debug("Distributed data store client {} started", identifier);
identifier = client.getIdentifier();
LOG.debug("Distributed data store client {} started", identifier);
- this.waitTillReadyTimeInMillis = actorUtils.getDatastoreContext().getShardLeaderElectionTimeout()
- .duration().toMillis() * READY_WAIT_FACTOR;
-
datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
this.client = null;
this.identifier = requireNonNull(identifier);
this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
this.client = null;
this.identifier = requireNonNull(identifier);
- this.waitTillReadyTimeInMillis = actorUtils.getDatastoreContext().getShardLeaderElectionTimeout()
- .duration().toMillis() * READY_WAIT_FACTOR;
this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
this.client = clientActor;
this.identifier = requireNonNull(identifier);
this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
this.client = clientActor;
this.identifier = requireNonNull(identifier);
- this.waitTillReadyTimeInMillis = actorUtils.getDatastoreContext().getShardLeaderElectionTimeout()
- .duration().toMillis() * READY_WAIT_FACTOR;
}
protected AbstractShardManagerCreator<?> getShardManagerCreator() {
}
protected AbstractShardManagerCreator<?> getShardManagerCreator() {
+ // TODO: consider removing this in favor of awaitReadiness()
public void waitTillReady() {
LOG.info("Beginning to wait for data store to become ready : {}", identifier);
public void waitTillReady() {
LOG.info("Beginning to wait for data store to become ready : {}", identifier);
+ final Duration toWait = initialSettleTime();
- if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
- LOG.debug("Data store {} is now ready", identifier);
+ if (toWait.isFinite()) {
+ if (!waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS)) {
+ LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
+ return;
+ }
- LOG.error("Shard leaders failed to settle in {} seconds, giving up",
- TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
+ waitTillReadyCountDownLatch.await();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for shards to settle", e);
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for shards to settle", e);
+ return;
+ }
+
+ LOG.debug("Data store {} is now ready", identifier);
+ }
+
+ @Beta
+ public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
+ if (!waitTillReadyCountDownLatch.await(timeout, unit)) {
+ throw new TimeoutException("Shard leaders failed to settle");
return (ListenerRegistration<L>) proxy;
}
return (ListenerRegistration<L>) proxy;
}
+ private Duration initialSettleTime() {
+ final DatastoreContext context = actorUtils.getDatastoreContext();
+ final int multiplier = context.getInitialSettleTimeoutMultiplier();
+ return multiplier == 0 ? Duration.Inf() : context.getShardLeaderElectionTimeout().duration().$times(multiplier);
+ }
public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+ public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
public static final boolean DEFAULT_PERSISTENT = true;
public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
public static final boolean DEFAULT_PERSISTENT = true;
public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+ private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
private boolean persistent = DEFAULT_PERSISTENT;
private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
private boolean persistent = DEFAULT_PERSISTENT;
private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
this.shardInitializationTimeout = other.shardInitializationTimeout;
this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
this.shardInitializationTimeout = other.shardInitializationTimeout;
this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
+ this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
this.persistent = other.persistent;
this.configurationReader = other.configurationReader;
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.persistent = other.persistent;
this.configurationReader = other.configurationReader;
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
return shardLeaderElectionTimeout;
}
return shardLeaderElectionTimeout;
}
+ /**
+ * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
+ * on the local node to settle.
+ *
+ * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
+ */
+ public int getInitialSettleTimeoutMultiplier() {
+ return initialSettleTimeoutMultiplier;
+ }
+
public boolean isPersistent() {
return persistent;
}
public boolean isPersistent() {
return persistent;
}
+ public Builder initialSettleTimeoutMultiplier(final int multiplier) {
+ checkArgument(multiplier >= 0);
+ datastoreContext.initialSettleTimeoutMultiplier = multiplier;
+ return this;
+ }
+
public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
}
public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
}
introspector, updater, null);
}
introspector, updater, null);
}
+ // TODO: separate out settle wait so it is better controlled
public static AbstractDataStore createInstance(final DOMSchemaService schemaService,
final DatastoreContext initialDatastoreContext, final DatastoreSnapshotRestore datastoreSnapshotRestore,
final ActorSystemProvider actorSystemProvider, final DatastoreContextIntrospector introspector,
public static AbstractDataStore createInstance(final DOMSchemaService schemaService,
final DatastoreContext initialDatastoreContext, final DatastoreSnapshotRestore datastoreSnapshotRestore,
final ActorSystemProvider actorSystemProvider, final DatastoreContextIntrospector introspector,
an operation (eg transaction create).";
}
an operation (eg transaction create).";
}
+ leaf initial-settle-timeout-multiplier {
+ default 3;
+ type uint32;
+ description "Multiplier for the maximum amount of time to wait for a shard to elect a leader.
+ Zero value means wait indefinitely (as long as it takes).";
+ }
+
leaf shard-batched-modification-count {
default 1000;
type non-zero-uint32-type;
leaf shard-batched-modification-count {
default 1000;
type non-zero-uint32-type;
properties.put("shard-transaction-commit-queue-capacity", "567");
properties.put("shard-initialization-timeout-in-seconds", "82");
properties.put("shard-leader-election-timeout-in-seconds", "66");
properties.put("shard-transaction-commit-queue-capacity", "567");
properties.put("shard-initialization-timeout-in-seconds", "82");
properties.put("shard-leader-election-timeout-in-seconds", "66");
+ properties.put("initial-settle-timeout-multiplier", "5");
properties.put("shard-isolated-leader-check-interval-in-millis", "123");
properties.put("shard-snapshot-data-threshold-percentage", "100");
properties.put("shard-election-timeout-factor", "21");
properties.put("shard-isolated-leader-check-interval-in-millis", "123");
properties.put("shard-snapshot-data-threshold-percentage", "100");
properties.put("shard-election-timeout-factor", "21");
assertEquals(567, context.getShardTransactionCommitQueueCapacity());
assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
assertEquals(567, context.getShardTransactionCommitQueueCapacity());
assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
+ assertEquals(5, context.getInitialSettleTimeoutMultiplier());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
properties.put("operation-timeout-in-seconds", "27");
properties.put("shard-heartbeat-interval-in-millis", "102");
properties.put("shard-election-timeout-factor", "22");
properties.put("operation-timeout-in-seconds", "27");
properties.put("shard-heartbeat-interval-in-millis", "102");
properties.put("shard-election-timeout-factor", "22");
+ properties.put("initial-settle-timeout-multiplier", "6");
properties.put("max-shard-data-change-executor-pool-size", "42");
properties.put("max-shard-data-store-executor-queue-size", "4444");
properties.put("persistent", "true");
properties.put("max-shard-data-change-executor-pool-size", "42");
properties.put("max-shard-data-store-executor-queue-size", "4444");
properties.put("persistent", "true");
assertEquals(567, context.getShardTransactionCommitQueueCapacity());
assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
assertEquals(567, context.getShardTransactionCommitQueueCapacity());
assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
+ assertEquals(6, context.getInitialSettleTimeoutMultiplier());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(22, context.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(22, context.getShardRaftConfig().getElectionTimeoutFactor());
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_CONFIGURATION_READER;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_CONFIGURATION_READER;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
context.getShardInitializationTimeout().duration().toMillis());
assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis(),
context.getShardLeaderElectionTimeout().duration().toMillis());
context.getShardInitializationTimeout().duration().toMillis());
assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis(),
context.getShardLeaderElectionTimeout().duration().toMillis());
+ assertEquals(DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER,
+ context.getInitialSettleTimeoutMultiplier());
assertEquals(DEFAULT_PERSISTENT, context.isPersistent());
assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS,
assertEquals(DEFAULT_PERSISTENT, context.isPersistent());
assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS,
TimeUnit.MILLISECONDS);
builder.shardLeaderElectionTimeout(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS);
builder.shardLeaderElectionTimeout(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
TimeUnit.MILLISECONDS);
+ builder.initialSettleTimeoutMultiplier(DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER + 1);
builder.persistent(!DEFAULT_PERSISTENT);
builder.shardIsolatedLeaderCheckIntervalInMillis(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1);
builder.shardSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1);
builder.persistent(!DEFAULT_PERSISTENT);
builder.shardIsolatedLeaderCheckIntervalInMillis(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1);
builder.shardSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1);
context.getShardInitializationTimeout().duration().toMillis());
assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
context.getShardLeaderElectionTimeout().duration().toMillis());
context.getShardInitializationTimeout().duration().toMillis());
assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
context.getShardLeaderElectionTimeout().duration().toMillis());
+ assertEquals(DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER + 1,
+ context.getInitialSettleTimeoutMultiplier());
assertEquals(!DEFAULT_PERSISTENT, context.isPersistent());
assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1,
assertEquals(!DEFAULT_PERSISTENT, context.isPersistent());
assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1,
public void testWaitTillReadyBlocking() {
doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
public void testWaitTillReadyBlocking() {
doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
+ doReturn(1).when(datastoreContext).getInitialSettleTimeoutMultiplier();
doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorUtils, UNKNOWN_ID)) {
doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorUtils, UNKNOWN_ID)) {