X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDatastoreContext.java;h=69131a0d1c19a238e0ba57df3786bad3b93caa65;hp=daba3fdf8ac18ac889c348b1af017c679603ebb3;hb=e7e69069ae5ecaacc9ea0e47cb40cdf68237d636;hpb=87eeb0d62755bf5d6bcfd07d40dd8e0ab86c155e diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index daba3fdf8a..69131a0d1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -5,17 +5,28 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.util.Timeout; +import com.google.common.annotations.VisibleForTesting; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader; -import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader; +import org.apache.commons.text.WordUtils; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ClientActorConfig; +import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; +import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; -import scala.concurrent.duration.Duration; +import org.opendaylight.controller.cluster.raft.PeerAddressResolver; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; /** @@ -23,48 +34,145 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ -public class DatastoreContext { - - private final InMemoryDOMDataStoreConfigProperties dataStoreProperties; - private final Duration shardTransactionIdleTimeout; - private final int operationTimeoutInSeconds; - private final String dataStoreMXBeanType; - private final ConfigParams shardRaftConfig; - private final int shardTransactionCommitTimeoutInSeconds; - private final int shardTransactionCommitQueueCapacity; - private final Timeout shardInitializationTimeout; - private final Timeout shardLeaderElectionTimeout; - private final boolean persistent; - private final ConfigurationReader configurationReader; - - private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties, - ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds, - Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds, - int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout, - Timeout shardLeaderElectionTimeout, - boolean persistent, ConfigurationReader configurationReader) { - this.dataStoreProperties = dataStoreProperties; - this.shardRaftConfig = shardRaftConfig; - this.dataStoreMXBeanType = dataStoreMXBeanType; - this.operationTimeoutInSeconds = operationTimeoutInSeconds; - this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; - this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; - this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; - this.shardInitializationTimeout = shardInitializationTimeout; - this.shardLeaderElectionTimeout = shardLeaderElectionTimeout; - this.persistent = persistent; - this.configurationReader = configurationReader; +// Non-final for mocking +public class DatastoreContext implements ClientActorConfig { + public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore"; + + public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10, + TimeUnit.MINUTES); + public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000; + public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30; + public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1; + public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000; + public static final int DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS = 0; + public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500; + public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = + DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; + public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000; + public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); + public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); + public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3; + public static final boolean DEFAULT_PERSISTENT = true; + public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false; + public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader(); + public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12; + public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; + public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1; + public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; + public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; + public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; + public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = + TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); + public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB + public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512; + + public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10; + + private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class); + + private static final Set GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet(); + + private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); + + private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; + private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS; + private String dataStoreMXBeanType; + private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS; + private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY; + private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT; + private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT; + private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER; + private boolean persistent = DEFAULT_PERSISTENT; + private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE; + private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; + private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; + private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; + private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL; + private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty(); + private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; + private boolean writeOnlyTransactionOptimizationsEnabled = true; + private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; + private boolean useTellBasedProtocol = false; + private boolean transactionDebugContextEnabled = false; + private String shardManagerPersistenceId; + private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE; + private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS; + private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS; + private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS; + private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY; + private boolean useLz4Compression = false; + + public static Set getGlobalDatastoreNames() { + return GLOBAL_DATASTORE_NAMES; + } + + DatastoreContext() { + setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE); + setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT); + setRecoverySnapshotIntervalSeconds(DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS); + setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS); + setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); + setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); + setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); + setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR); + setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD); + setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE); + } + + private DatastoreContext(final DatastoreContext other) { + this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout; + this.operationTimeoutInMillis = other.operationTimeoutInMillis; + this.dataStoreMXBeanType = other.dataStoreMXBeanType; + this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds; + this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity; + this.shardInitializationTimeout = other.shardInitializationTimeout; + this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout; + this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier; + this.persistent = other.persistent; + this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite; + this.configurationReader = other.configurationReader; + this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; + this.dataStoreName = other.dataStoreName; + this.logicalStoreType = other.logicalStoreType; + this.storeRoot = other.storeRoot; + this.shardBatchedModificationCount = other.shardBatchedModificationCount; + this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; + this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; + this.transactionDebugContextEnabled = other.transactionDebugContextEnabled; + this.shardManagerPersistenceId = other.shardManagerPersistenceId; + this.useTellBasedProtocol = other.useTellBasedProtocol; + this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval; + this.requestTimeout = other.requestTimeout; + this.noProgressTimeout = other.noProgressTimeout; + this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity; + this.useLz4Compression = other.useLz4Compression; + + setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); + setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); + setRecoverySnapshotIntervalSeconds(other.raftConfig.getRecoverySnapshotIntervalSeconds()); + setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis()); + setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis()); + setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage()); + setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); + setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor()); + setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); + setMaximumMessageSliceSize(other.getMaximumMessageSliceSize()); + setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); + setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); + setTempFileDirectory(other.getTempFileDirectory()); + setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold()); + setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold()); } public static Builder newBuilder() { - return new Builder(); + return new Builder(new DatastoreContext()); } - public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() { - return dataStoreProperties; + public static Builder newBuilderFrom(final DatastoreContext context) { + return new Builder(new DatastoreContext(context)); } - public Duration getShardTransactionIdleTimeout() { + public FiniteDuration getShardTransactionIdleTimeout() { return shardTransactionIdleTimeout; } @@ -72,12 +180,12 @@ public class DatastoreContext { return dataStoreMXBeanType; } - public int getOperationTimeoutInSeconds() { - return operationTimeoutInSeconds; + public long getOperationTimeoutInMillis() { + return operationTimeoutInMillis; } public ConfigParams getShardRaftConfig() { - return shardRaftConfig; + return raftConfig; } public int getShardTransactionCommitTimeoutInSeconds() { @@ -96,123 +204,480 @@ public class DatastoreContext { return shardLeaderElectionTimeout; } + /** + * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards + * on the local node to settle. + * + * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely. + */ + public int getInitialSettleTimeoutMultiplier() { + return initialSettleTimeoutMultiplier; + } + public boolean isPersistent() { return persistent; } - public ConfigurationReader getConfigurationReader() { + public boolean isSnapshotOnRootOverwrite() { + return this.snapshotOnRootOverwrite; + } + + public AkkaConfigurationReader getConfigurationReader() { return configurationReader; } - public static class Builder { - private InMemoryDOMDataStoreConfigProperties dataStoreProperties; - private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES); - private int operationTimeoutInSeconds = 5; - private String dataStoreMXBeanType; - private int shardTransactionCommitTimeoutInSeconds = 30; - private int shardJournalRecoveryLogBatchSize = 1000; - private int shardSnapshotBatchCount = 20000; - private int shardHeartbeatIntervalInMillis = 500; - private int shardTransactionCommitQueueCapacity = 20000; - private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES); - private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS); - private boolean persistent = true; - private ConfigurationReader configurationReader = new FileConfigurationReader(); - private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10; - private int shardSnapshotDataThresholdPercentage = 12; + public long getShardElectionTimeoutFactor() { + return raftConfig.getElectionTimeoutFactor(); + } + + public String getDataStoreName() { + return dataStoreName; + } + + public LogicalDatastoreType getLogicalStoreType() { + return logicalStoreType; + } + + public YangInstanceIdentifier getStoreRoot() { + return storeRoot; + } + + public long getTransactionCreationInitialRateLimit() { + return transactionCreationInitialRateLimit; + } + + public String getShardManagerPersistenceId() { + return shardManagerPersistenceId; + } + + @Override + public String getTempFileDirectory() { + return raftConfig.getTempFileDirectory(); + } + + private void setTempFileDirectory(final String tempFileDirectory) { + raftConfig.setTempFileDirectory(tempFileDirectory); + } + + @Override + public int getFileBackedStreamingThreshold() { + return raftConfig.getFileBackedStreamingThreshold(); + } + + private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) { + raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold); + } + + private void setPeerAddressResolver(final PeerAddressResolver resolver) { + raftConfig.setPeerAddressResolver(resolver); + } + + private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) { + raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, + TimeUnit.MILLISECONDS)); + } + + private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) { + raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); + } + + + private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) { + raftConfig.setIsolatedLeaderCheckInterval( + new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS)); + } + + private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) { + raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); + } + + private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor); + } + + private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) { + raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation); + } + + private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { + checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100); + raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); + } + + private void setSnapshotBatchCount(final long shardSnapshotBatchCount) { + raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); + } + + /** + * Set the interval in seconds after which a snapshot should be taken during the recovery process. + * 0 means don't take snapshots + */ + private void setRecoverySnapshotIntervalSeconds(final int recoverySnapshotInterval) { + raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval); + } + + @Deprecated + private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) { + // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed + // maximumMessageSliceSize. + if (shardSnapshotChunkSize < maximumMessageSliceSize) { + raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + } + } + + private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) { + raftConfig.setSnapshotChunkSize(maximumMessageSliceSize); + this.maximumMessageSliceSize = maximumMessageSliceSize; + } + + private void setSyncIndexThreshold(final long syncIndexThreshold) { + raftConfig.setSyncIndexThreshold(syncIndexThreshold); + } + + public int getShardBatchedModificationCount() { + return shardBatchedModificationCount; + } + + public boolean isWriteOnlyTransactionOptimizationsEnabled() { + return writeOnlyTransactionOptimizationsEnabled; + } + + public long getShardCommitQueueExpiryTimeoutInMillis() { + return shardCommitQueueExpiryTimeoutInMillis; + } + + public boolean isTransactionDebugContextEnabled() { + return transactionDebugContextEnabled; + } + + public boolean isUseTellBasedProtocol() { + return useTellBasedProtocol; + } + + public boolean isUseLz4Compression() { + return useLz4Compression; + } + + @Override + public int getMaximumMessageSliceSize() { + return maximumMessageSliceSize; + } + + @Override + public long getBackendAlivenessTimerInterval() { + return backendAlivenessTimerInterval; + } + + @Override + public long getRequestTimeout() { + return requestTimeout; + } + + @Override + public long getNoProgressTimeout() { + return noProgressTimeout; + } + + public int getInitialPayloadSerializedBufferCapacity() { + return initialPayloadSerializedBufferCapacity; + } + + public static class Builder implements org.opendaylight.yangtools.concepts.Builder { + private final DatastoreContext datastoreContext; + + Builder(final DatastoreContext datastoreContext) { + this.datastoreContext = datastoreContext; + } + + public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) { + // TODO - this is defined in the yang DataStoreProperties but not currently used. + return this; + } + + public Builder enableMetricCapture(final boolean enableMetricCapture) { + // TODO - this is defined in the yang DataStoreProperties but not currently used. + return this; + } + + + public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) { + datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit); + return this; + } + + public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) { + return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES); + } + + public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) { + datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds); + return this; + } + + public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) { + datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis; + return this; + } + + public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) { + datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType; + return this; + } + + public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) { + datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; + return this; + } + + public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) { + datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); + return this; + } + + public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) { + datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount); + return this; + } + + public Builder recoverySnapshotIntervalSeconds(final int recoverySnapshotIntervalSeconds) { + checkArgument(recoverySnapshotIntervalSeconds >= 0); + datastoreContext.setRecoverySnapshotIntervalSeconds(recoverySnapshotIntervalSeconds); + return this; + } + + public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { + datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); + return this; + } + + public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) { + datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis); + return this; + } + + public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) { + datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; + return this; + } + + public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) { + datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit); + return this; + } + + public Builder shardInitializationTimeoutInSeconds(final long timeout) { + return shardInitializationTimeout(timeout, TimeUnit.SECONDS); + } + + public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) { + datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit); + return this; + } + + public Builder initialSettleTimeoutMultiplier(final int multiplier) { + checkArgument(multiplier >= 0); + datastoreContext.initialSettleTimeoutMultiplier = multiplier; + return this; + } + + public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) { + return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS); + } + + public Builder configurationReader(final AkkaConfigurationReader configurationReader) { + datastoreContext.configurationReader = configurationReader; + return this; + } + + public Builder persistent(final boolean persistent) { + datastoreContext.persistent = persistent; + return this; + } - public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) { - this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; + public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) { + datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite; return this; } - public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) { - this.operationTimeoutInSeconds = operationTimeoutInSeconds; + public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) { + datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis); return this; } - public Builder dataStoreMXBeanType(String dataStoreMXBeanType) { - this.dataStoreMXBeanType = dataStoreMXBeanType; + public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) { + datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor); return this; } - public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) { - this.dataStoreProperties = dataStoreProperties; + public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor); return this; } - public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) { - this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; + public Builder transactionCreationInitialRateLimit(final long initialRateLimit) { + datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; return this; } - public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) { - this.shardJournalRecoveryLogBatchSize = shardJournalRecoveryLogBatchSize; + public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) { + datastoreContext.logicalStoreType = requireNonNull(logicalStoreType); + + // Retain compatible naming + switch (logicalStoreType) { + case CONFIGURATION: + dataStoreName("config"); + break; + case OPERATIONAL: + dataStoreName("operational"); + break; + default: + dataStoreName(logicalStoreType.name()); + } + + return this; + } + + public Builder storeRoot(final YangInstanceIdentifier storeRoot) { + datastoreContext.storeRoot = storeRoot; + return this; + } + + public Builder dataStoreName(final String dataStoreName) { + datastoreContext.dataStoreName = requireNonNull(dataStoreName); + datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore"; + return this; + } + + public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) { + datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount; + return this; + } + + public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) { + datastoreContext.writeOnlyTransactionOptimizationsEnabled = value; + return this; + } + + public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) { + datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value; + return this; + } + + public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) { + datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert( + value, TimeUnit.SECONDS); + return this; + } + + public Builder transactionDebugContextEnabled(final boolean value) { + datastoreContext.transactionDebugContextEnabled = value; + return this; + } + + @Deprecated(forRemoval = true) + public Builder maxShardDataChangeExecutorPoolSize(final int newMaxShardDataChangeExecutorPoolSize) { + return this; + } + + @Deprecated(forRemoval = true) + public Builder maxShardDataChangeExecutorQueueSize(final int newMaxShardDataChangeExecutorQueueSize) { + return this; + } + + @Deprecated(forRemoval = true) + public Builder maxShardDataChangeListenerQueueSize(final int newMaxShardDataChangeListenerQueueSize) { + return this; + } + + @Deprecated(forRemoval = true) + public Builder maxShardDataStoreExecutorQueueSize(final int newMaxShardDataStoreExecutorQueueSize) { + return this; + } + + public Builder useTellBasedProtocol(final boolean value) { + datastoreContext.useTellBasedProtocol = value; + return this; + } + + public Builder useLz4Compression(final boolean value) { + datastoreContext.useLz4Compression = value; + return this; + } + + /** + * For unit tests only. + */ + @VisibleForTesting + public Builder shardManagerPersistenceId(final String id) { + datastoreContext.shardManagerPersistenceId = id; return this; } - public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) { - this.shardSnapshotBatchCount = shardSnapshotBatchCount; + public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) { + datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation); return this; } - public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { - this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage; + @Deprecated + public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) { + LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - " + + "use maximum-message-slice-size instead"); + datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize); return this; } + public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) { + datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize); + return this; + } - public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) { - this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis; + public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) { + datastoreContext.setPeerAddressResolver(resolver); return this; } - public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) { - this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; + public Builder tempFileDirectory(final String tempFileDirectory) { + datastoreContext.setTempFileDirectory(tempFileDirectory); return this; } - public Builder shardInitializationTimeout(long timeout, TimeUnit unit) { - this.shardInitializationTimeout = new Timeout(timeout, unit); + public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) { + datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE); return this; } - public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) { - this.shardLeaderElectionTimeout = new Timeout(timeout, unit); + public Builder syncIndexThreshold(final long syncIndexThreshold) { + datastoreContext.setSyncIndexThreshold(syncIndexThreshold); return this; } - public Builder configurationReader(ConfigurationReader configurationReader){ - this.configurationReader = configurationReader; + public Builder backendAlivenessTimerIntervalInSeconds(final long interval) { + datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval); return this; } - public Builder persistent(boolean persistent){ - this.persistent = persistent; + public Builder frontendRequestTimeoutInSeconds(final long timeout) { + datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout); return this; } - public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) { - this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis; + public Builder frontendNoProgressTimeoutInSeconds(final long timeout) { + datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout); return this; } + public Builder initialPayloadSerializedBufferCapacity(final int capacity) { + datastoreContext.initialPayloadSerializedBufferCapacity = capacity; + return this; + } + @Override public DatastoreContext build() { - DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); - raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, - TimeUnit.MILLISECONDS)); - raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); - raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); - raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); - raftConfig.setIsolatedLeaderCheckInterval( - new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS)); + if (datastoreContext.dataStoreName != null) { + GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName); + } - return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType, - operationTimeoutInSeconds, shardTransactionIdleTimeout, - shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity, - shardInitializationTimeout, shardLeaderElectionTimeout, - persistent, configurationReader); + return datastoreContext; } } }