public static final FiniteDuration HEART_BEAT_INTERVAL =
new FiniteDuration(100, TimeUnit.MILLISECONDS);
+ private final Supplier<RaftPolicy> policySupplier = Suppliers.memoize(this::getPolicy);
+
private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
private long electionTimeoutFactor = 2;
private String customRaftPolicyImplementationClass;
- private final Supplier<RaftPolicy> policySupplier = Suppliers.memoize(new PolicySupplier());
-
private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE;
private String tempFileDirectory = "";
private int fileBackedStreamingThreshold = 128 * MEGABYTE;
- public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
+ private long syncIndexThreshold = 10;
+
+ public void setHeartBeatInterval(final FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
electionTimeOutInterval = null;
}
- public void setSnapshotBatchCount(long snapshotBatchCount) {
+ public void setSnapshotBatchCount(final long snapshotBatchCount) {
this.snapshotBatchCount = snapshotBatchCount;
}
- public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage) {
+ public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) {
this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
}
- public void setSnapshotChunkSize(int snapshotChunkSize) {
+ public void setSnapshotChunkSize(final int snapshotChunkSize) {
this.snapshotChunkSize = snapshotChunkSize;
}
- public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
+ public void setJournalRecoveryLogBatchSize(final int journalRecoveryLogBatchSize) {
this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
}
- public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
+ public void setIsolatedLeaderCheckInterval(final FiniteDuration isolatedLeaderCheckInterval) {
this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
}
- public void setElectionTimeoutFactor(long electionTimeoutFactor) {
+ public void setElectionTimeoutFactor(final long electionTimeoutFactor) {
this.electionTimeoutFactor = electionTimeoutFactor;
electionTimeOutInterval = null;
}
- public void setTempFileDirectory(String tempFileDirectory) {
+ public void setTempFileDirectory(final String tempFileDirectory) {
this.tempFileDirectory = tempFileDirectory;
}
- public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+ public void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
this.fileBackedStreamingThreshold = fileBackedStreamingThreshold;
}
- public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) {
+ public void setCustomRaftPolicyImplementationClass(final String customRaftPolicyImplementationClass) {
this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
}
return fileBackedStreamingThreshold;
}
- private class PolicySupplier implements Supplier<RaftPolicy> {
- @Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public RaftPolicy get() {
- if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
- LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
- return DefaultRaftPolicy.INSTANCE;
- }
-
- try {
- String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
- LOG.info("Trying to use custom RaftPolicy {}", className);
- return (RaftPolicy)Class.forName(className).newInstance();
- } catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.error("Could not create custom raft policy, will stick with default", e);
- } else {
- LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
- e.getMessage());
- }
- }
- return DefaultRaftPolicy.INSTANCE;
- }
- }
@Override
public PeerAddressResolver getPeerAddressResolver() {
return peerAddressResolver;
}
- public void setPeerAddressResolver(@Nonnull PeerAddressResolver peerAddressResolver) {
+ public void setPeerAddressResolver(@Nonnull final PeerAddressResolver peerAddressResolver) {
this.peerAddressResolver = Preconditions.checkNotNull(peerAddressResolver);
}
+
+ @Override
+ public long getSyncIndexThreshold() {
+ return syncIndexThreshold;
+ }
+
+ public void setSyncIndexThreshold(final long syncIndexThreshold) {
+ Preconditions.checkArgument(syncIndexThreshold >= 0);
+ this.syncIndexThreshold = syncIndexThreshold;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private RaftPolicy getPolicy() {
+ if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
+ LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
+ return DefaultRaftPolicy.INSTANCE;
+ }
+
+ try {
+ String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
+ LOG.info("Trying to use custom RaftPolicy {}", className);
+ return (RaftPolicy)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.error("Could not create custom raft policy, will stick with default", e);
+ } else {
+ LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
+ e.getMessage());
+ }
+ }
+ return DefaultRaftPolicy.INSTANCE;
+ }
}
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
- private static final int SYNC_THRESHOLD = 10;
-
private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18;
private final SyncStatusTracker initialSyncStatusTracker;
private String leaderId;
private short leaderPayloadVersion;
- public Follower(RaftActorContext context) {
+ public Follower(final RaftActorContext context) {
this(context, null, (short)-1);
}
- public Follower(RaftActorContext context, String initialLeaderId, short initialLeaderPayloadVersion) {
+ public Follower(final RaftActorContext context, final String initialLeaderId,
+ final short initialLeaderPayloadVersion) {
super(context, RaftState.Follower);
this.leaderId = initialLeaderId;
this.leaderPayloadVersion = initialLeaderPayloadVersion;
- initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
+ initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
+ .getSyncIndexThreshold());
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
actor().tell(TimeoutNow.INSTANCE, actor());
}
@VisibleForTesting
- protected final void setLeaderPayloadVersion(short leaderPayloadVersion) {
+ protected final void setLeaderPayloadVersion(final short leaderPayloadVersion) {
this.leaderPayloadVersion = leaderPayloadVersion;
}
lastLeaderMessageTimer.start();
}
- private boolean isLogEntryPresent(long index) {
+ private boolean isLogEntryPresent(final long index) {
if (context.getReplicatedLog().isInSnapshot(index)) {
return true;
}
}
- private void updateInitialSyncStatus(long currentLeaderCommit, String newLeaderId) {
+ private void updateInitialSyncStatus(final long currentLeaderCommit, final String newLeaderId) {
initialSyncStatusTracker.update(newLeaderId, currentLeaderCommit, context.getCommitIndex());
}
@Override
- protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
+ protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
if (log.isTraceEnabled()) {
return this;
}
- private boolean isOutOfSync(AppendEntries appendEntries) {
+ private boolean isOutOfSync(final AppendEntries appendEntries) {
long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
+ protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+ final AppendEntriesReply appendEntriesReply) {
return this;
}
@Override
- protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply) {
+ protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender,
+ final RequestVoteReply requestVoteReply) {
return this;
}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
return handleElectionTimeout(message);
}
return super.handleMessage(sender, rpc);
}
- private RaftActorBehavior handleElectionTimeout(Object message) {
+ private RaftActorBehavior handleElectionTimeout(final Object message) {
// If the message is ElectionTimeout, verify we haven't actually seen a message from the leader
// during the election timeout interval. It may that the election timer expired b/c this actor
// was busy and messages got delayed, in which case leader messages would be backed up in the
return false;
}
- private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
+ private void handleInstallSnapshot(final ActorRef sender, final InstallSnapshot installSnapshot) {
log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.text.WordUtils;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
- private static final Set<String> GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet();
+ public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
+
+ private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
+
+ private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private boolean persistent = DEFAULT_PERSISTENT;
private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
- private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
+ setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
}
private DatastoreContext(final DatastoreContext other) {
setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
setTempFileDirectory(other.getTempFileDirectory());
setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
+ setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
}
public static Builder newBuilder() {
return new Builder(new DatastoreContext());
}
- public static Builder newBuilderFrom(DatastoreContext context) {
+ public static Builder newBuilderFrom(final DatastoreContext context) {
return new Builder(new DatastoreContext(context));
}
return raftConfig.getTempFileDirectory();
}
- private void setTempFileDirectory(String tempFileDirectory) {
+ private void setTempFileDirectory(final String tempFileDirectory) {
raftConfig.setTempFileDirectory(tempFileDirectory);
}
return raftConfig.getFileBackedStreamingThreshold();
}
- private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+ private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
}
- private void setPeerAddressResolver(PeerAddressResolver resolver) {
+ private void setPeerAddressResolver(final PeerAddressResolver resolver) {
raftConfig.setPeerAddressResolver(resolver);
}
- private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) {
+ private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
}
- private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+ private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
}
- private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+ private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
raftConfig.setIsolatedLeaderCheckInterval(
new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
}
- private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+ private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
}
- private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) {
+ private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
}
- private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
&& shardSnapshotDataThresholdPercentage <= 100);
raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
}
- private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
+ private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
- private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
+ private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
}
+ private void setSyncIndexThreshold(final long syncIndexThreshold) {
+ raftConfig.setSyncIndexThreshold(syncIndexThreshold);
+ }
+
public int getShardBatchedModificationCount() {
return shardBatchedModificationCount;
}
return raftConfig.getSnapshotChunkSize();
}
- public static class Builder {
+ public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
private int maxShardDataStoreExecutorQueueSize =
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
- private Builder(DatastoreContext datastoreContext) {
+ private Builder(final DatastoreContext datastoreContext) {
this.datastoreContext = datastoreContext;
if (datastoreContext.getDataStoreProperties() != null) {
}
}
- public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
+ public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
// TODO - this is defined in the yang DataStoreProperties but not currently used.
return this;
}
- public Builder enableMetricCapture(boolean enableMetricCapture) {
+ public Builder enableMetricCapture(final boolean enableMetricCapture) {
// TODO - this is defined in the yang DataStoreProperties but not currently used.
return this;
}
- public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
+ public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
return this;
}
- public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
+ public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
}
- public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
+ public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
return this;
}
- public Builder operationTimeoutInMillis(long operationTimeoutInMillis) {
+ public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
return this;
}
- public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
+ public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
return this;
}
- public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
+ public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
return this;
}
- public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+ public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
return this;
}
- public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
+ public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
return this;
}
- public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
return this;
}
- public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
+ public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
return this;
}
- public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
+ public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
return this;
}
- public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
+ public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
return this;
}
- public Builder shardInitializationTimeoutInSeconds(long timeout) {
+ public Builder shardInitializationTimeoutInSeconds(final long timeout) {
return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
}
- public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
+ public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
return this;
}
- public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
+ public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
}
- public Builder configurationReader(AkkaConfigurationReader configurationReader) {
+ public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
datastoreContext.configurationReader = configurationReader;
return this;
}
- public Builder persistent(boolean persistent) {
+ public Builder persistent(final boolean persistent) {
datastoreContext.persistent = persistent;
return this;
}
- public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
+ public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
return this;
}
- public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+ public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
return this;
}
- public Builder transactionCreationInitialRateLimit(long initialRateLimit) {
+ public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
return this;
}
- public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) {
+ public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
// Retain compatible naming
return this;
}
- public Builder dataStoreName(String dataStoreName) {
+ public Builder dataStoreName(final String dataStoreName) {
datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
return this;
return this;
}
- public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+ public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
return this;
}
- public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
+ public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
return this;
}
- public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
+ public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
value, TimeUnit.SECONDS);
return this;
}
- public Builder transactionDebugContextEnabled(boolean value) {
+ public Builder transactionDebugContextEnabled(final boolean value) {
datastoreContext.transactionDebugContextEnabled = value;
return this;
}
- public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
+ public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) {
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
return this;
}
- public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
+ public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) {
this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
return this;
}
- public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
+ public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) {
this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
return this;
}
- public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
+ public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) {
this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
return this;
}
- public Builder useTellBasedProtocol(boolean value) {
+ public Builder useTellBasedProtocol(final boolean value) {
datastoreContext.useTellBasedProtocol = value;
return this;
}
* For unit tests only.
*/
@VisibleForTesting
- public Builder shardManagerPersistenceId(String id) {
+ public Builder shardManagerPersistenceId(final String id) {
datastoreContext.shardManagerPersistenceId = id;
return this;
}
- public DatastoreContext build() {
- datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
- maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
- maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
-
- if (datastoreContext.dataStoreName != null) {
- GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
- }
-
- return datastoreContext;
- }
-
- public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) {
+ public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
return this;
}
- public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) {
+ public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
return this;
}
- public Builder shardPeerAddressResolver(PeerAddressResolver resolver) {
+ public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
datastoreContext.setPeerAddressResolver(resolver);
return this;
}
- public Builder tempFileDirectory(String tempFileDirectory) {
+ public Builder tempFileDirectory(final String tempFileDirectory) {
datastoreContext.setTempFileDirectory(tempFileDirectory);
return this;
}
- public Builder fileBackedStreamingThresholdInMegabytes(int fileBackedStreamingThreshold) {
+ public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
return this;
}
+
+ public Builder syncIndexThreshold(final long syncIndexThreshold) {
+ datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
+ return this;
+ }
+
+ @Override
+ public DatastoreContext build() {
+ datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
+ maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
+ maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
+
+ if (datastoreContext.dataStoreName != null) {
+ GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
+ }
+
+ return datastoreContext;
+ }
}
}