X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDatastoreContext.java;h=0536e4b15a382c4913fa29ee292dcfc6c9b4b12f;hp=d5142c94a68b53311293de80cf5fc73d9415ed9b;hb=925cb4a228d0fda99c7bfeb432eb25285a223887;hpb=6dcee56392712348b1abdcdc0d1d5f94dfcf505c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index d5142c94a6..0536e4b15a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -9,12 +9,18 @@ package org.opendaylight.controller.cluster.datastore; import akka.util.Timeout; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.text.WordUtils; -import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader; -import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader; +import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; +import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.PeerAddressResolver; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -25,40 +31,55 @@ import scala.concurrent.duration.FiniteDuration; * @author Thomas Pantelis */ public class DatastoreContext { + public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore"; public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES); - public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 5; + public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000; public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30; - public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000; + public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1; public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000; public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500; - public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; - public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000; + public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = + DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; + public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000; public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); public static final boolean DEFAULT_PERSISTENT = true; - public static final FileConfigurationReader DEFAULT_CONFIGURATION_READER = new FileConfigurationReader(); + public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader(); public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12; public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; - public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100; + public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; + public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = + TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); + public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000; + + private static final Set GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet(); private InMemoryDOMDataStoreConfigProperties dataStoreProperties; private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; - private int operationTimeoutInSeconds = DEFAULT_OPERATION_TIMEOUT_IN_SECONDS; + private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS; private String dataStoreMXBeanType; private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS; private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY; private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT; private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT; private boolean persistent = DEFAULT_PERSISTENT; - private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; + private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); - private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; + private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; + private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL; private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; - private boolean writeOnlyTransactionOptimizationsEnabled = false; + private boolean writeOnlyTransactionOptimizationsEnabled = true; + private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; + private boolean transactionDebugContextEnabled = false; + private String shardManagerPersistenceId; + + public static Set getGlobalDatastoreNames() { + return GLOBAL_DATASTORE_NAMES; + } private DatastoreContext() { setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE); @@ -67,12 +88,13 @@ public class DatastoreContext { setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); + setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE); } private DatastoreContext(DatastoreContext other) { this.dataStoreProperties = other.dataStoreProperties; this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout; - this.operationTimeoutInSeconds = other.operationTimeoutInSeconds; + this.operationTimeoutInMillis = other.operationTimeoutInMillis; this.dataStoreMXBeanType = other.dataStoreMXBeanType; this.shardTransactionCommitTimeoutInSeconds = other.shardTransactionCommitTimeoutInSeconds; this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity; @@ -81,9 +103,13 @@ public class DatastoreContext { this.persistent = other.persistent; this.configurationReader = other.configurationReader; this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; - this.dataStoreType = other.dataStoreType; + this.dataStoreName = other.dataStoreName; + this.logicalStoreType = other.logicalStoreType; this.shardBatchedModificationCount = other.shardBatchedModificationCount; this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; + this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; + this.transactionDebugContextEnabled = other.transactionDebugContextEnabled; + this.shardManagerPersistenceId = other.shardManagerPersistenceId; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -91,6 +117,9 @@ public class DatastoreContext { setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis()); setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage()); setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); + setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); + setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); + setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); } public static Builder newBuilder() { @@ -113,8 +142,8 @@ public class DatastoreContext { return dataStoreMXBeanType; } - public int getOperationTimeoutInSeconds() { - return operationTimeoutInSeconds; + public long getOperationTimeoutInMillis() { + return operationTimeoutInMillis; } public ConfigParams getShardRaftConfig() { @@ -141,28 +170,40 @@ public class DatastoreContext { return persistent; } - public ConfigurationReader getConfigurationReader() { + public AkkaConfigurationReader getConfigurationReader() { return configurationReader; } - public long getShardElectionTimeoutFactor(){ + public long getShardElectionTimeoutFactor() { return raftConfig.getElectionTimeoutFactor(); } - public String getDataStoreType(){ - return dataStoreType; + public String getDataStoreName() { + return dataStoreName; + } + + public LogicalDatastoreType getLogicalStoreType() { + return logicalStoreType; } public long getTransactionCreationInitialRateLimit() { return transactionCreationInitialRateLimit; } - private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){ + public String getShardManagerPersistenceId() { + return shardManagerPersistenceId; + } + + private void setPeerAddressResolver(PeerAddressResolver resolver) { + raftConfig.setPeerAddressResolver(resolver); + } + + private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) { raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, TimeUnit.MILLISECONDS)); } - private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){ + private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) { raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); } @@ -176,7 +217,13 @@ public class DatastoreContext { raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); } + private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) { + raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation); + } + private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { + Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0 + && shardSnapshotDataThresholdPercentage <= 100); raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); } @@ -184,6 +231,10 @@ public class DatastoreContext { raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } + private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) { + raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + } + public int getShardBatchedModificationCount() { return shardBatchedModificationCount; } @@ -192,6 +243,18 @@ public class DatastoreContext { return writeOnlyTransactionOptimizationsEnabled; } + public long getShardCommitQueueExpiryTimeoutInMillis() { + return shardCommitQueueExpiryTimeoutInMillis; + } + + public boolean isTransactionDebugContextEnabled() { + return transactionDebugContextEnabled; + } + + public int getShardSnapshotChunkSize() { + return raftConfig.getSnapshotChunkSize(); + } + public static class Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = @@ -206,7 +269,7 @@ public class DatastoreContext { private Builder(DatastoreContext datastoreContext) { this.datastoreContext = datastoreContext; - if(datastoreContext.getDataStoreProperties() != null) { + if (datastoreContext.getDataStoreProperties() != null) { maxShardDataChangeExecutorPoolSize = datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize(); maxShardDataChangeExecutorQueueSize = @@ -239,7 +302,12 @@ public class DatastoreContext { } public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) { - datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds; + datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds); + return this; + } + + public Builder operationTimeoutInMillis(long operationTimeoutInMillis) { + datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis; return this; } @@ -296,12 +364,12 @@ public class DatastoreContext { return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS); } - public Builder configurationReader(ConfigurationReader configurationReader){ + public Builder configurationReader(AkkaConfigurationReader configurationReader) { datastoreContext.configurationReader = configurationReader; return this; } - public Builder persistent(boolean persistent){ + public Builder persistent(boolean persistent) { datastoreContext.persistent = persistent; return this; } @@ -311,19 +379,37 @@ public class DatastoreContext { return this; } - public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){ + public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) { datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor); return this; } - public Builder transactionCreationInitialRateLimit(long initialRateLimit){ + public Builder transactionCreationInitialRateLimit(long initialRateLimit) { datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; return this; } - public Builder dataStoreType(String dataStoreType){ - datastoreContext.dataStoreType = dataStoreType; - datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore"; + public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) { + datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType); + + // Retain compatible naming + switch (logicalStoreType) { + case CONFIGURATION: + dataStoreName("config"); + break; + case OPERATIONAL: + dataStoreName("operational"); + break; + default: + dataStoreName(logicalStoreType.name()); + } + + return this; + } + + public Builder dataStoreName(String dataStoreName) { + datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName); + datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore"; return this; } @@ -337,6 +423,22 @@ public class DatastoreContext { return this; } + public Builder shardCommitQueueExpiryTimeoutInMillis(long value) { + datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value; + return this; + } + + public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) { + datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert( + value, TimeUnit.SECONDS); + return this; + } + + public Builder transactionDebugContextEnabled(boolean value) { + datastoreContext.transactionDebugContextEnabled = value; + return this; + } + public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) { this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize; return this; @@ -357,11 +459,40 @@ public class DatastoreContext { return this; } + /** + * For unit tests only. + */ + @VisibleForTesting + public Builder shardManagerPersistenceId(String id) { + datastoreContext.shardManagerPersistenceId = id; + return this; + } + public DatastoreContext build() { datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create( maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize); + + if (datastoreContext.dataStoreName != null) { + GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName); + } + return datastoreContext; } + + public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) { + datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation); + return this; + } + + public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) { + datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize); + return this; + } + + public Builder shardPeerAddressResolver(PeerAddressResolver resolver) { + datastoreContext.setPeerAddressResolver(resolver); + return this; + } } }