X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDatastoreContext.java;h=1a46e3d4525f10f4bc616153e1f74733eeb81192;hb=bba56a2445d4ba38c7ecdee30a568bdc43d1d9de;hp=8be23d1994fa99b4cbbed31cd1316eb94e817976;hpb=2d60632f7cf63712e8357a3cf3fc40d83366e5e6;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 8be23d1994..1a46e3d452 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; import akka.util.Timeout; @@ -14,7 +13,7 @@ import com.google.common.base.Preconditions; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.text.WordUtils; +import org.apache.commons.text.WordUtils; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ClientActorConfig; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; @@ -22,12 +21,11 @@ import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationRea import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.PeerAddressResolver; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStoreConfigProperties; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -35,11 +33,12 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ -// Noo-final for mocking +// Non-final for mocking public class DatastoreContext implements ClientActorConfig { public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore"; - public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES); + public static final FiniteDuration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = FiniteDuration.create(10, + TimeUnit.MINUTES); public static final int DEFAULT_OPERATION_TIMEOUT_IN_MS = 5000; public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30; public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1; @@ -54,12 +53,14 @@ public class DatastoreContext implements ClientActorConfig { public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader(); public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12; public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; + public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1; public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB + public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512; public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10; @@ -70,7 +71,7 @@ public class DatastoreContext implements ClientActorConfig { private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private InMemoryDOMDataStoreConfigProperties dataStoreProperties; - private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; + private FiniteDuration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; private long operationTimeoutInMillis = DEFAULT_OPERATION_TIMEOUT_IN_MS; private String dataStoreMXBeanType; private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS; @@ -93,6 +94,7 @@ public class DatastoreContext implements ClientActorConfig { private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS; private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS; private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS; + private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY; public static Set getGlobalDatastoreNames() { return GLOBAL_DATASTORE_NAMES; @@ -105,6 +107,7 @@ public class DatastoreContext implements ClientActorConfig { setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); + setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR); setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD); setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE); } @@ -133,6 +136,7 @@ public class DatastoreContext implements ClientActorConfig { this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval; this.requestTimeout = other.requestTimeout; this.noProgressTimeout = other.noProgressTimeout; + this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -140,6 +144,7 @@ public class DatastoreContext implements ClientActorConfig { setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis()); setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage()); setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); + setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor()); setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); setMaximumMessageSliceSize(other.getMaximumMessageSliceSize()); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); @@ -161,7 +166,7 @@ public class DatastoreContext implements ClientActorConfig { return dataStoreProperties; } - public Duration getShardTransactionIdleTimeout() { + public FiniteDuration getShardTransactionIdleTimeout() { return shardTransactionIdleTimeout; } @@ -266,6 +271,10 @@ public class DatastoreContext implements ClientActorConfig { raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); } + private void setCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + raftConfig.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor); + } + private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) { raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation); } @@ -338,6 +347,10 @@ public class DatastoreContext implements ClientActorConfig { return noProgressTimeout; } + public int getInitialPayloadSerializedBufferCapacity() { + return initialPayloadSerializedBufferCapacity; + } + public static class Builder implements org.opendaylight.yangtools.concepts.Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = @@ -376,7 +389,7 @@ public class DatastoreContext implements ClientActorConfig { public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) { - datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit); + datastoreContext.shardTransactionIdleTimeout = FiniteDuration.create(timeout, unit); return this; } @@ -467,6 +480,11 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder shardCandidateElectionTimeoutDivisor(final long candidateElectionTimeoutDivisor) { + datastoreContext.setCandidateElectionTimeoutDivisor(candidateElectionTimeoutDivisor); + return this; + } + public Builder transactionCreationInitialRateLimit(final long initialRateLimit) { datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; return this; @@ -614,11 +632,19 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder initialPayloadSerializedBufferCapacity(final int capacity) { + datastoreContext.initialPayloadSerializedBufferCapacity = capacity; + return this; + } + @Override public DatastoreContext build() { - datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create( - maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, - maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize); + datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.builder() + .maxDataChangeExecutorPoolSize(maxShardDataChangeExecutorPoolSize) + .maxDataChangeExecutorQueueSize(maxShardDataChangeExecutorQueueSize) + .maxDataChangeListenerQueueSize(maxShardDataChangeListenerQueueSize) + .maxDataStoreExecutorQueueSize(maxShardDataStoreExecutorQueueSize) + .build(); if (datastoreContext.dataStoreName != null) { GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);