From 953a7321c6613a84c798db1b23cdfa7b0f2cf755 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 21 Jun 2017 10:53:53 -0400 Subject: [PATCH] Bug 7449: Add maximum-message-slice-size config param Added a new maximum-message-slice-size config param that will be used when fragmenting messages thru the akka remoting framework. This is a generalized version of the shard-snapshot-chunk-size param and replaces it. Change-Id: I4dc4cc0de92d6f876e5587cd8cb3ade2abb59285 Signed-off-by: Tom Pantelis --- .../src/main/resources/initial/datastore.cfg | 19 ++++++----- .../cluster/datastore/DatastoreContext.java | 34 ++++++++++++++++--- .../mbeans/DatastoreConfigurationMXBean.java | 2 +- .../DatastoreConfigurationMXBeanImpl.java | 5 ++- ...tributedConfigDataStoreProviderModule.java | 2 +- ...tedOperationalDataStoreProviderModule.java | 2 +- .../yang/distributed-datastore-provider.yang | 8 +++++ .../datastore/DatastoreContextTest.java | 8 ++--- 8 files changed, 56 insertions(+), 24 deletions(-) diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index 3ab714a9ac..853c8f0d19 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -24,14 +24,14 @@ operational.persistent=false # The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs. #shard-transaction-idle-timeout-in-minutes=10 -# The maximum amount of time a shard transaction three-phase commit can be idle without receiving the +# The maximum amount of time a shard transaction three-phase commit can be idle without receiving the # next messages before it aborts the transaction. #shard-transaction-commit-timeout-in-seconds=30 # The maximum allowed capacity for each shard's transaction commit queue. #shard-transaction-commit-queue-capacity=20000 -# The maximum amount of time to wait for a shard to initialize from persistence on startup before +# The maximum amount of time to wait for a shard to initialize from persistence on startup before # failing an operation (eg transaction create and change listener registration). #shard-initialization-timeout-in-seconds=300 @@ -41,20 +41,20 @@ operational.persistent=false # The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken. #shard-snapshot-data-threshold-percentage=12 -# The interval at which the leader of the shard will check if its majority followers are active and +# The interval at which the leader of the shard will check if its majority followers are active and # term itself as isolated. #shard-isolated-leader-check-interval-in-millis=5000 -# The number of transaction modification operations (put, merge, delete) to batch before sending to the -# shard transaction actor. Batching improves performance as less modifications messages are sent to the +# The number of transaction modification operations (put, merge, delete) to batch before sending to the +# shard transaction actor. Batching improves performance as less modifications messages are sent to the # actor and thus lessens the chance that the transaction actor's mailbox queue could get full. #shard-batched-modification-count=1000 # The maximum amount of time for akka operations (remote or local) to complete before failing. #operation-timeout-in-seconds=5 -# The initial number of transactions per second that are allowed before the data store should begin -# applying back pressure. This number is only used as an initial guidance, subsequently the datastore +# The initial number of transactions per second that are allowed before the data store should begin +# applying back pressure. This number is only used as an initial guidance, subsequently the datastore # measures the latency for a commit and auto-adjusts the rate limit. #transaction-creation-initial-rate-limit=100 @@ -78,8 +78,9 @@ operational.persistent=false # cannot be found then the default raft policy will be applied #custom-raft-policy-implementation= -# The maximum size (in bytes) for snapshot chunks to be sent during sync -#shard-snapshot-chunk-size=20480000 +# When fragmenting messages thru the akka remoting framework, this is the maximum size in bytes +# for a message slice. +#maximum-message-slice-size=20480000 # Enable tell-based protocol between frontend (applications) and backend (shards). Using this protocol # should avoid AskTimeoutExceptions seen under heavy load. Defaults to false (use ask-based protocol). diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 02f2768fbb..07115b7169 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -23,6 +23,8 @@ import org.opendaylight.controller.cluster.raft.PeerAddressResolver; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -54,10 +56,12 @@ public class DatastoreContext { public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000; public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); - public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000; + public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10; + private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class); + private static final Set GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet(); private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); @@ -82,6 +86,7 @@ public class DatastoreContext { private boolean useTellBasedProtocol = false; private boolean transactionDebugContextEnabled = false; private String shardManagerPersistenceId; + private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE; public static Set getGlobalDatastoreNames() { return GLOBAL_DATASTORE_NAMES; @@ -94,8 +99,8 @@ public class DatastoreContext { setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); - setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE); setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD); + setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE); } private DatastoreContext(final DatastoreContext other) { @@ -127,6 +132,7 @@ public class DatastoreContext { setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage()); setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor()); setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass()); + setMaximumMessageSliceSize(other.getMaximumMessageSliceSize()); setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize()); setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); setTempFileDirectory(other.getTempFileDirectory()); @@ -263,8 +269,18 @@ public class DatastoreContext { raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } + @Deprecated private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) { - raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed + // maximumMessageSliceSize. + if (shardSnapshotChunkSize < maximumMessageSliceSize) { + raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); + } + } + + private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) { + raftConfig.setSnapshotChunkSize(maximumMessageSliceSize); + this.maximumMessageSliceSize = maximumMessageSliceSize; } private void setSyncIndexThreshold(final long syncIndexThreshold) { @@ -291,8 +307,8 @@ public class DatastoreContext { return useTellBasedProtocol; } - public int getShardSnapshotChunkSize() { - return raftConfig.getSnapshotChunkSize(); + public int getMaximumMessageSliceSize() { + return maximumMessageSliceSize; } public static class Builder implements org.opendaylight.yangtools.concepts.Builder { @@ -523,11 +539,19 @@ public class DatastoreContext { return this; } + @Deprecated public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) { + LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - " + + "use maximum-message-slice-size instead"); datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize); return this; } + public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) { + datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize); + return this; + } + public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) { datastoreContext.setPeerAddressResolver(resolver); return this; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java index e5b1db98ec..3abdafd983 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java @@ -54,5 +54,5 @@ public interface DatastoreConfigurationMXBean { int getMaxShardDataStoreExecutorQueueSize(); - int getShardSnapshotChunkSize(); + int getMaximumMessageSliceSize(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java index b2ff9fba40..20b6e79e07 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java @@ -130,8 +130,7 @@ public class DatastoreConfigurationMXBeanImpl extends AbstractMXBean implements } @Override - public int getShardSnapshotChunkSize() { - return context.getShardSnapshotChunkSize(); + public int getMaximumMessageSliceSize() { + return context.getMaximumMessageSliceSize(); } - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index a99b8c3a98..f0938e8994 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -96,7 +96,7 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue()) .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled()) .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation()) - .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue()) + .maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue()) .useTellBasedProtocol(props.getUseTellBasedProtocol()) .syncIndexThreshold(props.getSyncIndexThreshold().getValue()) .build(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 44146aa8bb..a31d5177f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -96,7 +96,7 @@ public class DistributedOperationalDataStoreProviderModule props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue()) .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled()) .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation()) - .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue()) + .maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue()) .useTellBasedProtocol(props.getUseTellBasedProtocol()) .syncIndexThreshold(props.getSyncIndexThreshold().getValue()) .build(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index 49a51a625c..8b3d72d694 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -228,12 +228,20 @@ module distributed-datastore-provider { } leaf shard-snapshot-chunk-size { + status deprecated; default 2048000; type non-zero-uint32-type; description "When sending a snapshot to a follower, this is the maximum size in bytes for a chunk of data."; } + leaf maximum-message-slice-size { + default 2048000; + type non-zero-uint32-type; + description "When fragmenting messages thru the akka remoting framework, this is the + maximum size in bytes for a message slice."; + } + leaf use-tell-based-protocol { default false; type boolean; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java index 2aa9a455e3..dd117add08 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java @@ -13,13 +13,13 @@ import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEF import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE; +import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT; -import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY; @@ -69,7 +69,7 @@ public class DatastoreContextTest { context.getDataStoreProperties().getMaxDataChangeListenerQueueSize()); assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE, context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize()); - assertEquals(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE, context.getShardSnapshotChunkSize()); + assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE, context.getMaximumMessageSliceSize()); } @Test @@ -104,7 +104,7 @@ public class DatastoreContextTest { InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE + 1); builder.maxShardDataStoreExecutorQueueSize( InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1); - builder.shardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE + 1); + builder.maximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1); DatastoreContext context = builder.build(); @@ -155,6 +155,6 @@ public class DatastoreContextTest { context.getDataStoreProperties().getMaxDataChangeListenerQueueSize()); assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1, context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize()); - assertEquals(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE + 1, context.getShardSnapshotChunkSize()); + assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1, context.getMaximumMessageSliceSize()); } } -- 2.36.6