Bug 7449: Add maximum-message-slice-size config param 94/59294/3
authorTom Pantelis <tompantelis@gmail.com>
Wed, 21 Jun 2017 14:53:53 +0000 (10:53 -0400)
committerRobert Varga <nite@hq.sk>
Thu, 22 Jun 2017 20:41:33 +0000 (20:41 +0000)
Added a new maximum-message-slice-size config param that will be used
when fragmenting messages thru the akka remoting framework. This is
a generalized version of the shard-snapshot-chunk-size param and
replaces it.

Change-Id: I4dc4cc0de92d6f876e5587cd8cb3ade2abb59285
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java

index 3ab714a9acd16f9a22ffebbeb63276fb73afc239..853c8f0d19ab47f63a263846b95a112a9e99c16f 100644 (file)
@@ -24,14 +24,14 @@ operational.persistent=false
 # The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.
 #shard-transaction-idle-timeout-in-minutes=10
 
-# The maximum amount of time a shard transaction three-phase commit can be idle without receiving the 
+# The maximum amount of time a shard transaction three-phase commit can be idle without receiving the
 # next messages before it aborts the transaction.
 #shard-transaction-commit-timeout-in-seconds=30
 
 # The maximum allowed capacity for each shard's transaction commit queue.
 #shard-transaction-commit-queue-capacity=20000
 
-# The maximum amount of time to wait for a shard to initialize from persistence on startup before 
+# The maximum amount of time to wait for a shard to initialize from persistence on startup before
 # failing an operation (eg transaction create and change listener registration).
 #shard-initialization-timeout-in-seconds=300
 
@@ -41,20 +41,20 @@ operational.persistent=false
 # The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken.
 #shard-snapshot-data-threshold-percentage=12
 
-# The interval at which the leader of the shard will check if its majority followers are active and 
+# The interval at which the leader of the shard will check if its majority followers are active and
 # term itself as isolated.
 #shard-isolated-leader-check-interval-in-millis=5000
 
-# The number of transaction modification operations (put, merge, delete) to batch before sending to the 
-# shard transaction actor. Batching improves performance as less modifications messages are sent to the 
+# The number of transaction modification operations (put, merge, delete) to batch before sending to the
+# shard transaction actor. Batching improves performance as less modifications messages are sent to the
 # actor and thus lessens the chance that the transaction actor's mailbox queue could get full.
 #shard-batched-modification-count=1000
 
 # The maximum amount of time for akka operations (remote or local) to complete before failing.
 #operation-timeout-in-seconds=5
 
-# The initial number of transactions per second that are allowed before the data store should begin 
-# applying back pressure. This number is only used as an initial guidance, subsequently the datastore 
+# The initial number of transactions per second that are allowed before the data store should begin
+# applying back pressure. This number is only used as an initial guidance, subsequently the datastore
 # measures the latency for a commit and auto-adjusts the rate limit.
 #transaction-creation-initial-rate-limit=100
 
@@ -78,8 +78,9 @@ operational.persistent=false
 # cannot be found then the default raft policy will be applied
 #custom-raft-policy-implementation=
 
-# The maximum size (in bytes) for snapshot chunks to be sent during sync
-#shard-snapshot-chunk-size=20480000
+# When fragmenting messages thru the akka remoting framework, this is the maximum size in bytes
+# for a message slice.
+#maximum-message-slice-size=20480000
 
 # Enable tell-based protocol between frontend (applications) and backend (shards). Using this protocol
 # should avoid AskTimeoutExceptions seen under heavy load. Defaults to false (use ask-based protocol).
index 02f2768fbb2a74f3fce79994800dbfb88da774d9..07115b71699095e4891d819759dc5a3016aad6c0 100644 (file)
@@ -23,6 +23,8 @@ import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -54,10 +56,12 @@ public class DatastoreContext {
     public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
     public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
-    public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
+    public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
 
     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
 
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
+
     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
 
     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
@@ -82,6 +86,7 @@ public class DatastoreContext {
     private boolean useTellBasedProtocol = false;
     private boolean transactionDebugContextEnabled = false;
     private String shardManagerPersistenceId;
+    private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
 
     public static Set<String> getGlobalDatastoreNames() {
         return GLOBAL_DATASTORE_NAMES;
@@ -94,8 +99,8 @@ public class DatastoreContext {
         setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
-        setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
         setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
+        setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
     }
 
     private DatastoreContext(final DatastoreContext other) {
@@ -127,6 +132,7 @@ public class DatastoreContext {
         setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
         setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
         setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
+        setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
         setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
         setTempFileDirectory(other.getTempFileDirectory());
@@ -263,8 +269,18 @@ public class DatastoreContext {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
+    @Deprecated
     private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
-        raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+        // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
+        // maximumMessageSliceSize.
+        if (shardSnapshotChunkSize < maximumMessageSliceSize) {
+            raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+        }
+    }
+
+    private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
+        raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
+        this.maximumMessageSliceSize = maximumMessageSliceSize;
     }
 
     private void setSyncIndexThreshold(final long syncIndexThreshold) {
@@ -291,8 +307,8 @@ public class DatastoreContext {
         return useTellBasedProtocol;
     }
 
-    public int getShardSnapshotChunkSize() {
-        return raftConfig.getSnapshotChunkSize();
+    public int getMaximumMessageSliceSize() {
+        return maximumMessageSliceSize;
     }
 
     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
@@ -523,11 +539,19 @@ public class DatastoreContext {
             return this;
         }
 
+        @Deprecated
         public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
+            LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
+                    + "use maximum-message-slice-size instead");
             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
             return this;
         }
 
+        public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
+            datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
+            return this;
+        }
+
         public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
             datastoreContext.setPeerAddressResolver(resolver);
             return this;
index b2ff9fba40ab5fc7df32397b4d91eaa5c2bcca40..20b6e79e075fb4674f4ff3e6e73f76b6d61c45cb 100644 (file)
@@ -130,8 +130,7 @@ public class DatastoreConfigurationMXBeanImpl extends AbstractMXBean implements
     }
 
     @Override
-    public int getShardSnapshotChunkSize() {
-        return context.getShardSnapshotChunkSize();
+    public int getMaximumMessageSliceSize() {
+        return context.getMaximumMessageSliceSize();
     }
-
 }
index a99b8c3a986d8ef50dc46404c46e9b5a84c231bc..f0938e89941318fa183de13bc6e5465b64ec694b 100644 (file)
@@ -96,7 +96,7 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute
                         props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
                 .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
                 .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
-                .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
+                .maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue())
                 .useTellBasedProtocol(props.getUseTellBasedProtocol())
                 .syncIndexThreshold(props.getSyncIndexThreshold().getValue())
                 .build();
index 44146aa8bb98a48567696d282386fc32075e053b..a31d5177f0dae5621eb12e3e00c230abfc4799a7 100644 (file)
@@ -96,7 +96,7 @@ public class DistributedOperationalDataStoreProviderModule
                         props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
                 .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
                 .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
-                .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
+                .maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue())
                 .useTellBasedProtocol(props.getUseTellBasedProtocol())
                 .syncIndexThreshold(props.getSyncIndexThreshold().getValue())
                 .build();
index 49a51a625c9e7a4c9ce21f73c17c2c2e72d43357..8b3d72d6948ac1b3976c5b9c14e297d1e0eb8454 100644 (file)
@@ -228,12 +228,20 @@ module distributed-datastore-provider {
         }
 
         leaf shard-snapshot-chunk-size {
+            status deprecated;
             default 2048000;
             type non-zero-uint32-type;
             description "When sending a snapshot to a follower, this is the maximum size in bytes for
                          a chunk of data.";
         }
 
+        leaf maximum-message-slice-size {
+            default 2048000;
+            type non-zero-uint32-type;
+            description "When fragmenting messages thru the akka remoting framework, this is the
+                         maximum size in bytes for a message slice.";
+        }
+
         leaf use-tell-based-protocol {
             default false;
             type boolean;
index 2aa9a455e3e93e98780ce9cf37ce4405f6e62a56..dd117add08d823bd6086ae1395447747bfc0872b 100644 (file)
@@ -13,13 +13,13 @@ import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEF
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
-import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
@@ -69,7 +69,7 @@ public class DatastoreContextTest {
                 context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
         assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE,
                 context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
-        assertEquals(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE, context.getShardSnapshotChunkSize());
+        assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE, context.getMaximumMessageSliceSize());
     }
 
     @Test
@@ -104,7 +104,7 @@ public class DatastoreContextTest {
                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE + 1);
         builder.maxShardDataStoreExecutorQueueSize(
                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1);
-        builder.shardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE + 1);
+        builder.maximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1);
 
         DatastoreContext context = builder.build();
 
@@ -155,6 +155,6 @@ public class DatastoreContextTest {
                 context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
         assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1,
                 context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
-        assertEquals(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE + 1, context.getShardSnapshotChunkSize());
+        assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1, context.getMaximumMessageSliceSize());
     }
 }