import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.text.WordUtils;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
*
* @author Thomas Pantelis
*/
-public class DatastoreContext {
+public class DatastoreContext implements ClientActorConfig {
public static final String METRICS_DOMAIN = "org.opendaylight.controller.cluster.datastore";
public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
- public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
+ public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
+
private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private boolean useTellBasedProtocol = false;
private boolean transactionDebugContextEnabled = false;
private String shardManagerPersistenceId;
+ private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
+ private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
+ private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
+ private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
public static Set<String> getGlobalDatastoreNames() {
return GLOBAL_DATASTORE_NAMES;
setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
- setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
+ setMaximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE);
}
private DatastoreContext(final DatastoreContext other) {
this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
this.shardManagerPersistenceId = other.shardManagerPersistenceId;
this.useTellBasedProtocol = other.useTellBasedProtocol;
+ this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
+ this.requestTimeout = other.requestTimeout;
+ this.noProgressTimeout = other.noProgressTimeout;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
+ setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
setTempFileDirectory(other.getTempFileDirectory());
return shardManagerPersistenceId;
}
+ @Override
public String getTempFileDirectory() {
return raftConfig.getTempFileDirectory();
}
raftConfig.setTempFileDirectory(tempFileDirectory);
}
+ @Override
public int getFileBackedStreamingThreshold() {
return raftConfig.getFileBackedStreamingThreshold();
}
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
+ @Deprecated
private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
- raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+ // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
+ // maximumMessageSliceSize.
+ if (shardSnapshotChunkSize < maximumMessageSliceSize) {
+ raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
+ }
+ }
+
+ private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
+ raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
+ this.maximumMessageSliceSize = maximumMessageSliceSize;
}
private void setSyncIndexThreshold(final long syncIndexThreshold) {
return useTellBasedProtocol;
}
- public int getShardSnapshotChunkSize() {
- return raftConfig.getSnapshotChunkSize();
+ @Override
+ public int getMaximumMessageSliceSize() {
+ return maximumMessageSliceSize;
+ }
+
+ @Override
+ public long getBackendAlivenessTimerInterval() {
+ return backendAlivenessTimerInterval;
+ }
+
+ @Override
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ @Override
+ public long getNoProgressTimeout() {
+ return noProgressTimeout;
}
public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
return this;
}
+ @Deprecated
public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
+ LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
+ + "use maximum-message-slice-size instead");
datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
return this;
}
+ public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
+ datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
+ return this;
+ }
+
public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
datastoreContext.setPeerAddressResolver(resolver);
return this;
return this;
}
- public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
+ public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
return this;
}
return this;
}
+ public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
+ datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
+ return this;
+ }
+
+ public Builder frontendRequestTimeoutInSeconds(final long timeout) {
+ datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
+ return this;
+ }
+
+ public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
+ datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
+ return this;
+ }
+
@Override
public DatastoreContext build() {
datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(