# the stack trace of the creator of the Tx when there is an exception when the transaction is submitted
# (e.g. for a failed validation). Defaults to false due to performance impact.
#transaction-debug-context-enabled=true
+
+# Multiplicator of shard-leader-election-timeout-in-seconds for the purposes of initial datastore
+# convergence. Each frontend datastore instance will wait specified amount of time before becoming
+# exposed as a service. A value of 0 indicates waiting forever. Defaults to 3.
+initial-settle-timeout-multiplier=3
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
/**
* Base implementation of a distributed DOMStore.
private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
- private static final long READY_WAIT_FACTOR = 3;
-
+ private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+ private final ClientIdentifier identifier;
+ private final DataStoreClient client;
private final ActorUtils actorUtils;
- private final long waitTillReadyTimeInMillis;
private AutoCloseable closeable;
-
private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
-
private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
- private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
-
- private final ClientIdentifier identifier;
- private final DataStoreClient client;
-
@SuppressWarnings("checkstyle:IllegalCatch")
protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
identifier = client.getIdentifier();
LOG.debug("Distributed data store client {} started", identifier);
- this.waitTillReadyTimeInMillis = actorUtils.getDatastoreContext().getShardLeaderElectionTimeout()
- .duration().toMillis() * READY_WAIT_FACTOR;
-
datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
this.client = null;
this.identifier = requireNonNull(identifier);
- this.waitTillReadyTimeInMillis = actorUtils.getDatastoreContext().getShardLeaderElectionTimeout()
- .duration().toMillis() * READY_WAIT_FACTOR;
}
@VisibleForTesting
this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
this.client = clientActor;
this.identifier = requireNonNull(identifier);
- this.waitTillReadyTimeInMillis = actorUtils.getDatastoreContext().getShardLeaderElectionTimeout()
- .duration().toMillis() * READY_WAIT_FACTOR;
}
protected AbstractShardManagerCreator<?> getShardManagerCreator() {
return actorUtils;
}
+ // TODO: consider removing this in favor of awaitReadiness()
public void waitTillReady() {
LOG.info("Beginning to wait for data store to become ready : {}", identifier);
+ final Duration toWait = initialSettleTime();
try {
- if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
- LOG.debug("Data store {} is now ready", identifier);
+ if (toWait.isFinite()) {
+ if (!waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS)) {
+ LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
+ return;
+ }
} else {
- LOG.error("Shard leaders failed to settle in {} seconds, giving up",
- TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
+ waitTillReadyCountDownLatch.await();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for shards to settle", e);
+ return;
+ }
+
+ LOG.debug("Data store {} is now ready", identifier);
+ }
+
+ @Beta
+ public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
+ if (!waitTillReadyCountDownLatch.await(timeout, unit)) {
+ throw new TimeoutException("Shard leaders failed to settle");
}
}
return (ListenerRegistration<L>) proxy;
}
+ private Duration initialSettleTime() {
+ final DatastoreContext context = actorUtils.getDatastoreContext();
+ final int multiplier = context.getInitialSettleTimeoutMultiplier();
+ return multiplier == 0 ? Duration.Inf() : context.getShardLeaderElectionTimeout().duration().$times(multiplier);
+ }
}
public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+ public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
public static final boolean DEFAULT_PERSISTENT = true;
public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+ private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
private boolean persistent = DEFAULT_PERSISTENT;
private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity;
this.shardInitializationTimeout = other.shardInitializationTimeout;
this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
+ this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
this.persistent = other.persistent;
this.configurationReader = other.configurationReader;
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
return shardLeaderElectionTimeout;
}
+ /**
+ * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards
+ * on the local node to settle.
+ *
+ * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely.
+ */
+ public int getInitialSettleTimeoutMultiplier() {
+ return initialSettleTimeoutMultiplier;
+ }
+
public boolean isPersistent() {
return persistent;
}
return this;
}
+ public Builder initialSettleTimeoutMultiplier(final int multiplier) {
+ checkArgument(multiplier >= 0);
+ datastoreContext.initialSettleTimeoutMultiplier = multiplier;
+ return this;
+ }
+
public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
}
introspector, updater, null);
}
+ // TODO: separate out settle wait so it is better controlled
public static AbstractDataStore createInstance(final DOMSchemaService schemaService,
final DatastoreContext initialDatastoreContext, final DatastoreSnapshotRestore datastoreSnapshotRestore,
final ActorSystemProvider actorSystemProvider, final DatastoreContextIntrospector introspector,
an operation (eg transaction create).";
}
+ leaf initial-settle-timeout-multiplier {
+ default 3;
+ type uint32;
+ description "Multiplier for the maximum amount of time to wait for a shard to elect a leader.
+ Zero value means wait indefinitely (as long as it takes).";
+ }
+
leaf shard-batched-modification-count {
default 1000;
type non-zero-uint32-type;
properties.put("shard-transaction-commit-queue-capacity", "567");
properties.put("shard-initialization-timeout-in-seconds", "82");
properties.put("shard-leader-election-timeout-in-seconds", "66");
+ properties.put("initial-settle-timeout-multiplier", "5");
properties.put("shard-isolated-leader-check-interval-in-millis", "123");
properties.put("shard-snapshot-data-threshold-percentage", "100");
properties.put("shard-election-timeout-factor", "21");
assertEquals(567, context.getShardTransactionCommitQueueCapacity());
assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
+ assertEquals(5, context.getInitialSettleTimeoutMultiplier());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
properties.put("operation-timeout-in-seconds", "27");
properties.put("shard-heartbeat-interval-in-millis", "102");
properties.put("shard-election-timeout-factor", "22");
+ properties.put("initial-settle-timeout-multiplier", "6");
properties.put("max-shard-data-change-executor-pool-size", "42");
properties.put("max-shard-data-store-executor-queue-size", "4444");
properties.put("persistent", "true");
assertEquals(567, context.getShardTransactionCommitQueueCapacity());
assertEquals(82, context.getShardInitializationTimeout().duration().toSeconds());
assertEquals(66, context.getShardLeaderElectionTimeout().duration().toSeconds());
+ assertEquals(6, context.getInitialSettleTimeoutMultiplier());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(22, context.getShardRaftConfig().getElectionTimeoutFactor());
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_CONFIGURATION_READER;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
context.getShardInitializationTimeout().duration().toMillis());
assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis(),
context.getShardLeaderElectionTimeout().duration().toMillis());
+ assertEquals(DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER,
+ context.getInitialSettleTimeoutMultiplier());
assertEquals(DEFAULT_PERSISTENT, context.isPersistent());
assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS,
TimeUnit.MILLISECONDS);
builder.shardLeaderElectionTimeout(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
TimeUnit.MILLISECONDS);
+ builder.initialSettleTimeoutMultiplier(DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER + 1);
builder.persistent(!DEFAULT_PERSISTENT);
builder.shardIsolatedLeaderCheckIntervalInMillis(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1);
builder.shardSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1);
context.getShardInitializationTimeout().duration().toMillis());
assertEquals(DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT.duration().toMillis() + 1,
context.getShardLeaderElectionTimeout().duration().toMillis());
+ assertEquals(DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER + 1,
+ context.getInitialSettleTimeoutMultiplier());
assertEquals(!DEFAULT_PERSISTENT, context.isPersistent());
assertEquals(DEFAULT_CONFIGURATION_READER, context.getConfigurationReader());
assertEquals(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1,
public void testWaitTillReadyBlocking() {
doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
+ doReturn(1).when(datastoreContext).getInitialSettleTimeoutMultiplier();
doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorUtils, UNKNOWN_ID)) {