X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDatastoreContext.java;h=e6d8ed84573d0094e222b38f0088a8175c953280;hb=178ebab612c3ddd338e759ca7e929c25c623b0b3;hp=2e524b96b78036cb41c9d1019fbfdaf9c9d766d8;hpb=35235f427f3a056f85fe83ddd1133e67540328f7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 2e524b96b7..e6d8ed8457 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -7,9 +7,11 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -26,7 +28,6 @@ import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStoreConfigPrope import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -34,7 +35,7 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ -// Noo-final for mocking +// Non-final for mocking public class DatastoreContext implements ClientActorConfig { public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore"; @@ -50,16 +51,19 @@ public class DatastoreContext implements ClientActorConfig { public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000; public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); + public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3; public static final boolean DEFAULT_PERSISTENT = true; public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader(); public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12; public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; + public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1; public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB + public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512; public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10; @@ -77,12 +81,13 @@ public class DatastoreContext implements ClientActorConfig { private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY; private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT; private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT; + private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER; private boolean persistent = DEFAULT_PERSISTENT; private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL; - private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY; + private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.empty(); private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; private boolean writeOnlyTransactionOptimizationsEnabled = true; private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; @@ -93,6 +98,7 @@ public class DatastoreContext implements ClientActorConfig { private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS; private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS; private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS; + private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY; public static Set getGlobalDatastoreNames() { return GLOBAL_DATASTORE_NAMES; @@ -105,6 +111,7 @@ public class DatastoreContext implements ClientActorConfig { setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); + setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR); setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD); setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE); } @@ -118,6 +125,7 @@ public class DatastoreContext implements ClientActorConfig { this.shardTransactionCommitQueueCapacity = other.shardTransactionCommitQueueCapacity; this.shardInitializationTimeout = other.shardInitializationTimeout; this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout; + this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier; this.persistent = other.persistent; this.configurationReader = other.configurationReader; this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; @@ -133,6 +141,7 @@ public class DatastoreContext implements ClientActorConfig { this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval; this.requestTimeout = other.requestTimeout; this.noProgressTimeout = other.noProgressTimeout; + this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -140,6 +149,7 @@ public class DatastoreContext implements ClientActorConfig { setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis()); setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage()); setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); + setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor()); setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); setMaximumMessageSliceSize(other.getMaximumMessageSliceSize()); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); @@ -161,7 +171,7 @@ public class DatastoreContext implements ClientActorConfig { return dataStoreProperties; } - public Duration getShardTransactionIdleTimeout() { + public FiniteDuration getShardTransactionIdleTimeout() { return shardTransactionIdleTimeout; } @@ -193,6 +203,16 @@ public class DatastoreContext implements ClientActorConfig { return shardLeaderElectionTimeout; } + /** + * Return the multiplier of {@link #getShardLeaderElectionTimeout()} which the frontend will wait for all shards + * on the local node to settle. + * + * @return Non-negative multiplier. Value of {@code 0} indicates to wait indefinitely. + */ + public int getInitialSettleTimeoutMultiplier() { + return initialSettleTimeoutMultiplier; + } + public boolean isPersistent() { return persistent; } @@ -266,13 +286,16 @@ public class DatastoreContext implements ClientActorConfig { raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); } + private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor); + } + private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) { raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation); } private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { - Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0 - && shardSnapshotDataThresholdPercentage <= 100); + checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100); raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); } @@ -338,6 +361,10 @@ public class DatastoreContext implements ClientActorConfig { return noProgressTimeout; } + public int getInitialPayloadSerializedBufferCapacity() { + return initialPayloadSerializedBufferCapacity; + } + public static class Builder implements org.opendaylight.yangtools.concepts.Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = @@ -443,6 +470,12 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder initialSettleTimeoutMultiplier(final int multiplier) { + checkArgument(multiplier >= 0); + datastoreContext.initialSettleTimeoutMultiplier = multiplier; + return this; + } + public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) { return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS); } @@ -467,13 +500,18 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor); + return this; + } + public Builder transactionCreationInitialRateLimit(final long initialRateLimit) { datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; return this; } public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) { - datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType); + datastoreContext.logicalStoreType = requireNonNull(logicalStoreType); // Retain compatible naming switch (logicalStoreType) { @@ -496,7 +534,7 @@ public class DatastoreContext implements ClientActorConfig { } public Builder dataStoreName(final String dataStoreName) { - datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName); + datastoreContext.dataStoreName = requireNonNull(dataStoreName); datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore"; return this; } @@ -614,6 +652,11 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder initialPayloadSerializedBufferCapacity(final int capacity) { + datastoreContext.initialPayloadSerializedBufferCapacity = capacity; + return this; + } + @Override public DatastoreContext build() { datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.builder()