X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDatastoreContext.java;h=eeb6ad3155e2bc91d9e910728d589cca2084603b;hp=c8be6aba9dc51f79493925d66a3d80ab27546737;hb=2634ed7138a343f051ff6452ccc7edd3abfc0c3a;hpb=6c4f573a4c01d2f21565154d6b183b45466bd3bd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index c8be6aba9d..eeb6ad3155 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -19,7 +20,9 @@ import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationRea import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.PeerAddressResolver; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -34,10 +37,11 @@ public class DatastoreContext { public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES); public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000; public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30; - public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000; + public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1; public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000; public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500; - public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; + public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = + DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000; public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); @@ -48,10 +52,11 @@ public class DatastoreContext { public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; - public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); + public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = + TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000; - private static Set globalDatastoreTypes = Sets.newConcurrentHashSet(); + private static final Set GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet(); private InMemoryDOMDataStoreConfigProperties dataStoreProperties; private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; @@ -65,15 +70,18 @@ public class DatastoreContext { private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); - private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; + private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; + private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL; + private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY; private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; private boolean writeOnlyTransactionOptimizationsEnabled = true; private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; + private boolean useTellBasedProtocol = false; private boolean transactionDebugContextEnabled = false; private String shardManagerPersistenceId; - public static Set getGlobalDatastoreTypes() { - return globalDatastoreTypes; + public static Set getGlobalDatastoreNames() { + return GLOBAL_DATASTORE_NAMES; } private DatastoreContext() { @@ -86,7 +94,7 @@ public class DatastoreContext { setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE); } - private DatastoreContext(DatastoreContext other) { + private DatastoreContext(final DatastoreContext other) { this.dataStoreProperties = other.dataStoreProperties; this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout; this.operationTimeoutInMillis = other.operationTimeoutInMillis; @@ -98,12 +106,15 @@ public class DatastoreContext { this.persistent = other.persistent; this.configurationReader = other.configurationReader; this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; - this.dataStoreType = other.dataStoreType; + this.dataStoreName = other.dataStoreName; + this.logicalStoreType = other.logicalStoreType; + this.storeRoot = other.storeRoot; this.shardBatchedModificationCount = other.shardBatchedModificationCount; this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; this.transactionDebugContextEnabled = other.transactionDebugContextEnabled; this.shardManagerPersistenceId = other.shardManagerPersistenceId; + this.useTellBasedProtocol = other.useTellBasedProtocol; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -114,6 +125,8 @@ public class DatastoreContext { setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); + setTempFileDirectory(other.getTempFileDirectory()); + setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold()); } public static Builder newBuilder() { @@ -168,12 +181,20 @@ public class DatastoreContext { return configurationReader; } - public long getShardElectionTimeoutFactor(){ + public long getShardElectionTimeoutFactor() { return raftConfig.getElectionTimeoutFactor(); } - public String getDataStoreType(){ - return dataStoreType; + public String getDataStoreName() { + return dataStoreName; + } + + public LogicalDatastoreType getLogicalStoreType() { + return logicalStoreType; + } + + public YangInstanceIdentifier getStoreRoot() { + return storeRoot; } public long getTransactionCreationInitialRateLimit() { @@ -184,16 +205,32 @@ public class DatastoreContext { return shardManagerPersistenceId; } + public String getTempFileDirectory() { + return raftConfig.getTempFileDirectory(); + } + + private void setTempFileDirectory(String tempFileDirectory) { + raftConfig.setTempFileDirectory(tempFileDirectory); + } + + public int getFileBackedStreamingThreshold() { + return raftConfig.getFileBackedStreamingThreshold(); + } + + private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) { + raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold); + } + private void setPeerAddressResolver(PeerAddressResolver resolver) { raftConfig.setPeerAddressResolver(resolver); } - private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){ + private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) { raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, TimeUnit.MILLISECONDS)); } - private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){ + private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) { raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); } @@ -212,6 +249,8 @@ public class DatastoreContext { } private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { + Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0 + && shardSnapshotDataThresholdPercentage <= 100); raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); } @@ -239,6 +278,10 @@ public class DatastoreContext { return transactionDebugContextEnabled; } + public boolean isUseTellBasedProtocol() { + return useTellBasedProtocol; + } + public int getShardSnapshotChunkSize() { return raftConfig.getSnapshotChunkSize(); } @@ -257,7 +300,7 @@ public class DatastoreContext { private Builder(DatastoreContext datastoreContext) { this.datastoreContext = datastoreContext; - if(datastoreContext.getDataStoreProperties() != null) { + if (datastoreContext.getDataStoreProperties() != null) { maxShardDataChangeExecutorPoolSize = datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize(); maxShardDataChangeExecutorQueueSize = @@ -352,12 +395,12 @@ public class DatastoreContext { return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS); } - public Builder configurationReader(AkkaConfigurationReader configurationReader){ + public Builder configurationReader(AkkaConfigurationReader configurationReader) { datastoreContext.configurationReader = configurationReader; return this; } - public Builder persistent(boolean persistent){ + public Builder persistent(boolean persistent) { datastoreContext.persistent = persistent; return this; } @@ -367,23 +410,46 @@ public class DatastoreContext { return this; } - public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){ + public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) { datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor); return this; } - public Builder transactionCreationInitialRateLimit(long initialRateLimit){ + public Builder transactionCreationInitialRateLimit(long initialRateLimit) { datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; return this; } - public Builder dataStoreType(String dataStoreType){ - datastoreContext.dataStoreType = dataStoreType; - datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore"; + public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) { + datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType); + + // Retain compatible naming + switch (logicalStoreType) { + case CONFIGURATION: + dataStoreName("config"); + break; + case OPERATIONAL: + dataStoreName("operational"); + break; + default: + dataStoreName(logicalStoreType.name()); + } + + return this; + } + + public Builder storeRoot(final YangInstanceIdentifier storeRoot) { + datastoreContext.storeRoot = storeRoot; return this; } - public Builder shardBatchedModificationCount(int shardBatchedModificationCount) { + public Builder dataStoreName(String dataStoreName) { + datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName); + datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore"; + return this; + } + + public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) { datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount; return this; } @@ -429,6 +495,11 @@ public class DatastoreContext { return this; } + public Builder useTellBasedProtocol(boolean value) { + datastoreContext.useTellBasedProtocol = value; + return this; + } + /** * For unit tests only. */ @@ -443,8 +514,8 @@ public class DatastoreContext { maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize); - if(datastoreContext.dataStoreType != null) { - globalDatastoreTypes.add(datastoreContext.dataStoreType); + if (datastoreContext.dataStoreName != null) { + GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName); } return datastoreContext; @@ -464,5 +535,15 @@ public class DatastoreContext { datastoreContext.setPeerAddressResolver(resolver); return this; } + + public Builder tempFileDirectory(String tempFileDirectory) { + datastoreContext.setTempFileDirectory(tempFileDirectory); + return this; + } + + public Builder fileBackedStreamingThresholdInMegabytes(int fileBackedStreamingThreshold) { + datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE); + return this; + } } }