X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDatastoreContext.java;h=dee8142bbc0904e6666784143d8f1cb795f27baa;hb=27b168d3ca3807123b4877f1ad0662b2610f393d;hp=02f2768fbb2a74f3fce79994800dbfb88da774d9;hpb=b25ae9347455b1bae8f25424a9ceffc017f2f0db;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 02f2768fbb..dee8142bbc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -15,6 +15,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.text.WordUtils; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ClientActorConfig; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader; import org.opendaylight.controller.cluster.raft.ConfigParams; @@ -23,6 +25,8 @@ import org.opendaylight.controller.cluster.raft.PeerAddressResolver; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -31,7 +35,7 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ -public class DatastoreContext { +public class DatastoreContext implements ClientActorConfig { public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore"; public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES); @@ -54,10 +58,12 @@ public class DatastoreContext { public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); - public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000; + public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10; + private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class); + private static final Set GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet(); private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); @@ -82,6 +88,10 @@ public class DatastoreContext { private boolean useTellBasedProtocol = false; private boolean transactionDebugContextEnabled = false; private String shardManagerPersistenceId; + private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE; + private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS; + private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS; + private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS; public static Set getGlobalDatastoreNames() { return GLOBAL_DATASTORE_NAMES; @@ -94,8 +104,8 @@ public class DatastoreContext { setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); - setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE); setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD); + setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE); } private DatastoreContext(final DatastoreContext other) { @@ -119,6 +129,9 @@ public class DatastoreContext { this.transactionDebugContextEnabled = other.transactionDebugContextEnabled; this.shardManagerPersistenceId = other.shardManagerPersistenceId; this.useTellBasedProtocol = other.useTellBasedProtocol; + this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval; + this.requestTimeout = other.requestTimeout; + this.noProgressTimeout = other.noProgressTimeout; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -127,6 +140,7 @@ public class DatastoreContext { setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage()); setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); + setMaximumMessageSliceSize(other.getMaximumMessageSliceSize()); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); setTempFileDirectory(other.getTempFileDirectory()); @@ -210,6 +224,7 @@ public class DatastoreContext { return shardManagerPersistenceId; } + @Override public String getTempFileDirectory() { return raftConfig.getTempFileDirectory(); } @@ -218,6 +233,7 @@ public class DatastoreContext { raftConfig.setTempFileDirectory(tempFileDirectory); } + @Override public int getFileBackedStreamingThreshold() { return raftConfig.getFileBackedStreamingThreshold(); } @@ -263,8 +279,18 @@ public class DatastoreContext { raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } + @Deprecated private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) { - raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed + // maximumMessageSliceSize. + if (shardSnapshotChunkSize < maximumMessageSliceSize) { + raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + } + } + + private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) { + raftConfig.setSnapshotChunkSize(maximumMessageSliceSize); + this.maximumMessageSliceSize = maximumMessageSliceSize; } private void setSyncIndexThreshold(final long syncIndexThreshold) { @@ -291,8 +317,24 @@ public class DatastoreContext { return useTellBasedProtocol; } - public int getShardSnapshotChunkSize() { - return raftConfig.getSnapshotChunkSize(); + @Override + public int getMaximumMessageSliceSize() { + return maximumMessageSliceSize; + } + + @Override + public long getBackendAlivenessTimerInterval() { + return backendAlivenessTimerInterval; + } + + @Override + public long getRequestTimeout() { + return requestTimeout; + } + + @Override + public long getNoProgressTimeout() { + return noProgressTimeout; } public static class Builder implements org.opendaylight.yangtools.concepts.Builder { @@ -523,11 +565,19 @@ public class DatastoreContext { return this; } + @Deprecated public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) { + LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - " + + "use maximum-message-slice-size instead"); datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize); return this; } + public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) { + datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize); + return this; + } + public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) { datastoreContext.setPeerAddressResolver(resolver); return this; @@ -538,7 +588,7 @@ public class DatastoreContext { return this; } - public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) { + public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) { datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE); return this; } @@ -548,6 +598,21 @@ public class DatastoreContext { return this; } + public Builder backendAlivenessTimerIntervalInSeconds(final long interval) { + datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval); + return this; + } + + public Builder frontendRequestTimeoutInSeconds(final long timeout) { + datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout); + return this; + } + + public Builder frontendNoProgressTimeoutInSeconds(final long timeout) { + datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout); + return this; + } + @Override public DatastoreContext build() { datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(