# The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.
#shard-transaction-idle-timeout-in-minutes=10
-# The maximum amount of time a shard transaction three-phase commit can be idle without receiving the
+# The maximum amount of time a shard transaction three-phase commit can be idle without receiving the
# next messages before it aborts the transaction.
#shard-transaction-commit-timeout-in-seconds=30
# The maximum allowed capacity for each shard's transaction commit queue.
#shard-transaction-commit-queue-capacity=20000
-# The maximum amount of time to wait for a shard to initialize from persistence on startup before
+# The maximum amount of time to wait for a shard to initialize from persistence on startup before
# failing an operation (eg transaction create and change listener registration).
#shard-initialization-timeout-in-seconds=300
# The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken.
#shard-snapshot-data-threshold-percentage=12
-# The interval at which the leader of the shard will check if its majority followers are active and
+# The interval at which the leader of the shard will check if its majority followers are active and
# term itself as isolated.
#shard-isolated-leader-check-interval-in-millis=5000
-# The number of transaction modification operations (put, merge, delete) to batch before sending to the
-# shard transaction actor. Batching improves performance as less modifications messages are sent to the
+# The number of transaction modification operations (put, merge, delete) to batch before sending to the
+# shard transaction actor. Batching improves performance as less modifications messages are sent to the
# actor and thus lessens the chance that the transaction actor's mailbox queue could get full.
#shard-batched-modification-count=1000
# The maximum amount of time for akka operations (remote or local) to complete before failing.
#operation-timeout-in-seconds=5
-# The initial number of transactions per second that are allowed before the data store should begin
-# applying back pressure. This number is only used as an initial guidance, subsequently the datastore
+# The initial number of transactions per second that are allowed before the data store should begin
+# applying back pressure. This number is only used as an initial guidance, subsequently the datastore
# measures the latency for a commit and auto-adjusts the rate limit.
#transaction-creation-initial-rate-limit=100
# cannot be found then the default raft policy will be applied
#custom-raft-policy-implementation=
-# The maximum size (in bytes) for snapshot chunks to be sent during sync
-#shard-snapshot-chunk-size=20480000
+# When fragmenting messages thru the akka remoting framework, this is the maximum size in bytes
+# for a message slice.
+#maximum-message-slice-size=20480000
# Enable tell-based protocol between frontend (applications) and backend (shards). Using this protocol
# should avoid AskTimeoutExceptions seen under heavy load. Defaults to false (use ask-based protocol).
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
- public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
+ public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
+
private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private boolean useTellBasedProtocol = false;
private boolean transactionDebugContextEnabled = false;
private String shardManagerPersistenceId;
+ private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
public static Set<String> getGlobalDatastoreNames() {
return GLOBAL_DATASTORE_NAMES;
setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
- setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
+ setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
}
private DatastoreContext(final DatastoreContext other) {
setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
+ setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
setTempFileDirectory(other.getTempFileDirectory());
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
+ @Deprecated
private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
- raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+ // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
+ // maximumMessageSliceSize.
+ if (shardSnapshotChunkSize < maximumMessageSliceSize) {
+ raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+ }
+ }
+
+ private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
+ raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
+ this.maximumMessageSliceSize = maximumMessageSliceSize;
}
private void setSyncIndexThreshold(final long syncIndexThreshold) {
return useTellBasedProtocol;
}
- public int getShardSnapshotChunkSize() {
- return raftConfig.getSnapshotChunkSize();
+ public int getMaximumMessageSliceSize() {
+ return maximumMessageSliceSize;
}
public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
return this;
}
+ @Deprecated
public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
+ LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
+ + "use maximum-message-slice-size instead");
datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
return this;
}
+ public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
+ datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
+ return this;
+ }
+
public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
datastoreContext.setPeerAddressResolver(resolver);
return this;
int getMaxShardDataStoreExecutorQueueSize();
- int getShardSnapshotChunkSize();
+ int getMaximumMessageSliceSize();
}
}
@Override
- public int getShardSnapshotChunkSize() {
- return context.getShardSnapshotChunkSize();
+ public int getMaximumMessageSliceSize() {
+ return context.getMaximumMessageSliceSize();
}
-
}
props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
.transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
.customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
- .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
+ .maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue())
.useTellBasedProtocol(props.getUseTellBasedProtocol())
.syncIndexThreshold(props.getSyncIndexThreshold().getValue())
.build();
props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
.transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
.customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
- .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
+ .maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue())
.useTellBasedProtocol(props.getUseTellBasedProtocol())
.syncIndexThreshold(props.getSyncIndexThreshold().getValue())
.build();
}
leaf shard-snapshot-chunk-size {
+ status deprecated;
default 2048000;
type non-zero-uint32-type;
description "When sending a snapshot to a follower, this is the maximum size in bytes for
a chunk of data.";
}
+ leaf maximum-message-slice-size {
+ default 2048000;
+ type non-zero-uint32-type;
+ description "When fragmenting messages thru the akka remoting framework, this is the
+ maximum size in bytes for a message slice.";
+ }
+
leaf use-tell-based-protocol {
default false;
type boolean;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
-import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE,
context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
- assertEquals(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE, context.getShardSnapshotChunkSize());
+ assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE, context.getMaximumMessageSliceSize());
}
@Test
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE + 1);
builder.maxShardDataStoreExecutorQueueSize(
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1);
- builder.shardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE + 1);
+ builder.maximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1);
DatastoreContext context = builder.build();
context.getDataStoreProperties().getMaxDataChangeListenerQueueSize());
assertEquals(InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1,
context.getDataStoreProperties().getMaxDataStoreExecutorQueueSize());
- assertEquals(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE + 1, context.getShardSnapshotChunkSize());
+ assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1, context.getMaximumMessageSliceSize());
}
}