From b25ae9347455b1bae8f25424a9ceffc017f2f0db Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 15 Jun 2017 03:13:47 +0200 Subject: [PATCH] BUG-8618: make sync threshold tuneable We are observing quite a few of these transitions, which may be coming from batching scenarios. Introduce sync-index-threshold config knob to expose control over it. Change-Id: Ief4c89c2fe5b95cebaf3fb83cbcdda37cac126b6 Signed-off-by: Robert Varga (cherry picked from commit 890e4bbf40aee318a2174bd4130cf34437e5617b) --- .../controller/cluster/raft/ConfigParams.java | 8 + .../cluster/raft/DefaultConfigParamsImpl.java | 84 +++++----- .../cluster/raft/behaviors/Follower.java | 34 ++-- .../src/main/resources/initial/datastore.cfg | 4 + .../cluster/datastore/DatastoreContext.java | 151 ++++++++++-------- ...tributedConfigDataStoreProviderModule.java | 16 +- ...tedOperationalDataStoreProviderModule.java | 17 +- .../yang/distributed-datastore-provider.yang | 8 + 8 files changed, 185 insertions(+), 137 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java index 86ce3113fa..8d6b965d9b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -131,4 +131,12 @@ public interface ConfigParams { * @return the threshold in terms of number of bytes. */ int getFileBackedStreamingThreshold(); + + /** + * Returns the threshold in terms of number journal entries that we can lag behind a leader until we raise a + * 'not synced' transition. + * + * @return the threshold in terms of number of journal entries. + */ + long getSyncIndexThreshold(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 56fb633672..64502858a7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -49,6 +49,8 @@ public class DefaultConfigParamsImpl implements ConfigParams { public static final FiniteDuration HEART_BEAT_INTERVAL = new FiniteDuration(100, TimeUnit.MILLISECONDS); + private final Supplier policySupplier = Suppliers.memoize(this::getPolicy); + private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL; private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT; private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE; @@ -64,53 +66,53 @@ public class DefaultConfigParamsImpl implements ConfigParams { private long electionTimeoutFactor = 2; private String customRaftPolicyImplementationClass; - private final Supplier policySupplier = Suppliers.memoize(new PolicySupplier()); - private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE; private String tempFileDirectory = ""; private int fileBackedStreamingThreshold = 128 * MEGABYTE; - public void setHeartBeatInterval(FiniteDuration heartBeatInterval) { + private long syncIndexThreshold = 10; + + public void setHeartBeatInterval(final FiniteDuration heartBeatInterval) { this.heartBeatInterval = heartBeatInterval; electionTimeOutInterval = null; } - public void setSnapshotBatchCount(long snapshotBatchCount) { + public void setSnapshotBatchCount(final long snapshotBatchCount) { this.snapshotBatchCount = snapshotBatchCount; } - public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage) { + public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) { this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage; } - public void setSnapshotChunkSize(int snapshotChunkSize) { + public void setSnapshotChunkSize(final int snapshotChunkSize) { this.snapshotChunkSize = snapshotChunkSize; } - public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) { + public void setJournalRecoveryLogBatchSize(final int journalRecoveryLogBatchSize) { this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize; } - public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) { + public void setIsolatedLeaderCheckInterval(final FiniteDuration isolatedLeaderCheckInterval) { this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis(); } - public void setElectionTimeoutFactor(long electionTimeoutFactor) { + public void setElectionTimeoutFactor(final long electionTimeoutFactor) { this.electionTimeoutFactor = electionTimeoutFactor; electionTimeOutInterval = null; } - public void setTempFileDirectory(String tempFileDirectory) { + public void setTempFileDirectory(final String tempFileDirectory) { this.tempFileDirectory = tempFileDirectory; } - public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) { + public void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) { this.fileBackedStreamingThreshold = fileBackedStreamingThreshold; } - public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) { + public void setCustomRaftPolicyImplementationClass(final String customRaftPolicyImplementationClass) { this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass; } @@ -184,37 +186,45 @@ public class DefaultConfigParamsImpl implements ConfigParams { return fileBackedStreamingThreshold; } - private class PolicySupplier implements Supplier { - @Override - @SuppressWarnings("checkstyle:IllegalCatch") - public RaftPolicy get() { - if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) { - LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy"); - return DefaultRaftPolicy.INSTANCE; - } - - try { - String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass; - LOG.info("Trying to use custom RaftPolicy {}", className); - return (RaftPolicy)Class.forName(className).newInstance(); - } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.error("Could not create custom raft policy, will stick with default", e); - } else { - LOG.error("Could not create custom raft policy, will stick with default : cause = {}", - e.getMessage()); - } - } - return DefaultRaftPolicy.INSTANCE; - } - } @Override public PeerAddressResolver getPeerAddressResolver() { return peerAddressResolver; } - public void setPeerAddressResolver(@Nonnull PeerAddressResolver peerAddressResolver) { + public void setPeerAddressResolver(@Nonnull final PeerAddressResolver peerAddressResolver) { this.peerAddressResolver = Preconditions.checkNotNull(peerAddressResolver); } + + @Override + public long getSyncIndexThreshold() { + return syncIndexThreshold; + } + + public void setSyncIndexThreshold(final long syncIndexThreshold) { + Preconditions.checkArgument(syncIndexThreshold >= 0); + this.syncIndexThreshold = syncIndexThreshold; + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private RaftPolicy getPolicy() { + if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) { + LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy"); + return DefaultRaftPolicy.INSTANCE; + } + + try { + String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass; + LOG.info("Trying to use custom RaftPolicy {}", className); + return (RaftPolicy)Class.forName(className).newInstance(); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.error("Could not create custom raft policy, will stick with default", e); + } else { + LOG.error("Could not create custom raft policy, will stick with default : cause = {}", + e.getMessage()); + } + } + return DefaultRaftPolicy.INSTANCE; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index b512089692..c35de820db 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -51,8 +51,6 @@ import org.opendaylight.controller.cluster.raft.persisted.Snapshot; * */ public class Follower extends AbstractRaftActorBehavior { - private static final int SYNC_THRESHOLD = 10; - private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18; private final SyncStatusTracker initialSyncStatusTracker; @@ -62,16 +60,18 @@ public class Follower extends AbstractRaftActorBehavior { private String leaderId; private short leaderPayloadVersion; - public Follower(RaftActorContext context) { + public Follower(final RaftActorContext context) { this(context, null, (short)-1); } - public Follower(RaftActorContext context, String initialLeaderId, short initialLeaderPayloadVersion) { + public Follower(final RaftActorContext context, final String initialLeaderId, + final short initialLeaderPayloadVersion) { super(context, RaftState.Follower); this.leaderId = initialLeaderId; this.leaderPayloadVersion = initialLeaderPayloadVersion; - initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD); + initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams() + .getSyncIndexThreshold()); if (context.getPeerIds().isEmpty() && getLeaderId() == null) { actor().tell(TimeoutNow.INSTANCE, actor()); @@ -96,7 +96,7 @@ public class Follower extends AbstractRaftActorBehavior { } @VisibleForTesting - protected final void setLeaderPayloadVersion(short leaderPayloadVersion) { + protected final void setLeaderPayloadVersion(final short leaderPayloadVersion) { this.leaderPayloadVersion = leaderPayloadVersion; } @@ -108,7 +108,7 @@ public class Follower extends AbstractRaftActorBehavior { lastLeaderMessageTimer.start(); } - private boolean isLogEntryPresent(long index) { + private boolean isLogEntryPresent(final long index) { if (context.getReplicatedLog().isInSnapshot(index)) { return true; } @@ -118,12 +118,12 @@ public class Follower extends AbstractRaftActorBehavior { } - private void updateInitialSyncStatus(long currentLeaderCommit, String newLeaderId) { + private void updateInitialSyncStatus(final long currentLeaderCommit, final String newLeaderId) { initialSyncStatusTracker.update(newLeaderId, currentLeaderCommit, context.getCommitIndex()); } @Override - protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { + protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) { int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0; if (log.isTraceEnabled()) { @@ -317,7 +317,7 @@ public class Follower extends AbstractRaftActorBehavior { return this; } - private boolean isOutOfSync(AppendEntries appendEntries) { + private boolean isOutOfSync(final AppendEntries appendEntries) { long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex()); boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex()); @@ -371,19 +371,19 @@ public class Follower extends AbstractRaftActorBehavior { } @Override - protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply) { + protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, + final AppendEntriesReply appendEntriesReply) { return this; } @Override - protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply) { + protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, + final RequestVoteReply requestVoteReply) { return this; } @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) { if (message instanceof ElectionTimeout || message instanceof TimeoutNow) { return handleElectionTimeout(message); } @@ -419,7 +419,7 @@ public class Follower extends AbstractRaftActorBehavior { return super.handleMessage(sender, rpc); } - private RaftActorBehavior handleElectionTimeout(Object message) { + private RaftActorBehavior handleElectionTimeout(final Object message) { // If the message is ElectionTimeout, verify we haven't actually seen a message from the leader // during the election timeout interval. It may that the election timer expired b/c this actor // was busy and messages got delayed, in which case leader messages would be backed up in the @@ -512,7 +512,7 @@ public class Follower extends AbstractRaftActorBehavior { return false; } - private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) { + private void handleInstallSnapshot(final ActorRef sender, final InstallSnapshot installSnapshot) { log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot); diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index 6d4d3b404e..3ab714a9ac 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -84,3 +84,7 @@ operational.persistent=false # Enable tell-based protocol between frontend (applications) and backend (shards). Using this protocol # should avoid AskTimeoutExceptions seen under heavy load. Defaults to false (use ask-based protocol). #use-tell-based-protocol=true + +# Tune the maximum number of entries a follower is allowed to lag behind the leader before it is +# considered out-of-sync. This flag may require tuning in face of a large number of small transactions. +#sync-index-threshold=10 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index eeb6ad3155..02f2768fbb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -11,8 +11,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.text.WordUtils; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; @@ -56,7 +56,11 @@ public class DatastoreContext { TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000; - private static final Set GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet(); + public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10; + + private static final Set GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet(); + + private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private InMemoryDOMDataStoreConfigProperties dataStoreProperties; private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; @@ -69,7 +73,6 @@ public class DatastoreContext { private boolean persistent = DEFAULT_PERSISTENT; private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; - private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL; private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY; @@ -92,6 +95,7 @@ public class DatastoreContext { setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR); setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE); + setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD); } private DatastoreContext(final DatastoreContext other) { @@ -127,13 +131,14 @@ public class DatastoreContext { setPeerAddressResolver(other.raftConfig.getPeerAddressResolver()); setTempFileDirectory(other.getTempFileDirectory()); setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold()); + setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold()); } public static Builder newBuilder() { return new Builder(new DatastoreContext()); } - public static Builder newBuilderFrom(DatastoreContext context) { + public static Builder newBuilderFrom(final DatastoreContext context) { return new Builder(new DatastoreContext(context)); } @@ -209,7 +214,7 @@ public class DatastoreContext { return raftConfig.getTempFileDirectory(); } - private void setTempFileDirectory(String tempFileDirectory) { + private void setTempFileDirectory(final String tempFileDirectory) { raftConfig.setTempFileDirectory(tempFileDirectory); } @@ -217,51 +222,55 @@ public class DatastoreContext { return raftConfig.getFileBackedStreamingThreshold(); } - private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) { + private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) { raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold); } - private void setPeerAddressResolver(PeerAddressResolver resolver) { + private void setPeerAddressResolver(final PeerAddressResolver resolver) { raftConfig.setPeerAddressResolver(resolver); } - private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) { + private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) { raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, TimeUnit.MILLISECONDS)); } - private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) { + private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) { raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); } - private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) { + private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) { raftConfig.setIsolatedLeaderCheckInterval( new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS)); } - private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) { + private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) { raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); } - private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) { + private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) { raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation); } - private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { + private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0 && shardSnapshotDataThresholdPercentage <= 100); raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); } - private void setSnapshotBatchCount(long shardSnapshotBatchCount) { + private void setSnapshotBatchCount(final long shardSnapshotBatchCount) { raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } - private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) { + private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) { raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize); } + private void setSyncIndexThreshold(final long syncIndexThreshold) { + raftConfig.setSyncIndexThreshold(syncIndexThreshold); + } + public int getShardBatchedModificationCount() { return shardBatchedModificationCount; } @@ -286,7 +295,7 @@ public class DatastoreContext { return raftConfig.getSnapshotChunkSize(); } - public static class Builder { + public static class Builder implements org.opendaylight.yangtools.concepts.Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE; @@ -297,7 +306,7 @@ public class DatastoreContext { private int maxShardDataStoreExecutorQueueSize = InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE; - private Builder(DatastoreContext datastoreContext) { + private Builder(final DatastoreContext datastoreContext) { this.datastoreContext = datastoreContext; if (datastoreContext.getDataStoreProperties() != null) { @@ -312,115 +321,115 @@ public class DatastoreContext { } } - public Builder boundedMailboxCapacity(int boundedMailboxCapacity) { + public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) { // TODO - this is defined in the yang DataStoreProperties but not currently used. return this; } - public Builder enableMetricCapture(boolean enableMetricCapture) { + public Builder enableMetricCapture(final boolean enableMetricCapture) { // TODO - this is defined in the yang DataStoreProperties but not currently used. return this; } - public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) { + public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) { datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit); return this; } - public Builder shardTransactionIdleTimeoutInMinutes(long timeout) { + public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) { return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES); } - public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) { + public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) { datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds); return this; } - public Builder operationTimeoutInMillis(long operationTimeoutInMillis) { + public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) { datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis; return this; } - public Builder dataStoreMXBeanType(String dataStoreMXBeanType) { + public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) { datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType; return this; } - public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) { + public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) { datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; return this; } - public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) { + public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) { datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); return this; } - public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) { + public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) { datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount); return this; } - public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { + public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) { datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); return this; } - public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) { + public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) { datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis); return this; } - public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) { + public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) { datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; return this; } - public Builder shardInitializationTimeout(long timeout, TimeUnit unit) { + public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) { datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit); return this; } - public Builder shardInitializationTimeoutInSeconds(long timeout) { + public Builder shardInitializationTimeoutInSeconds(final long timeout) { return shardInitializationTimeout(timeout, TimeUnit.SECONDS); } - public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) { + public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) { datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit); return this; } - public Builder shardLeaderElectionTimeoutInSeconds(long timeout) { + public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) { return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS); } - public Builder configurationReader(AkkaConfigurationReader configurationReader) { + public Builder configurationReader(final AkkaConfigurationReader configurationReader) { datastoreContext.configurationReader = configurationReader; return this; } - public Builder persistent(boolean persistent) { + public Builder persistent(final boolean persistent) { datastoreContext.persistent = persistent; return this; } - public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) { + public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) { datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis); return this; } - public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) { + public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) { datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor); return this; } - public Builder transactionCreationInitialRateLimit(long initialRateLimit) { + public Builder transactionCreationInitialRateLimit(final long initialRateLimit) { datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; return this; } - public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) { + public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) { datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType); // Retain compatible naming @@ -443,7 +452,7 @@ public class DatastoreContext { return this; } - public Builder dataStoreName(String dataStoreName) { + public Builder dataStoreName(final String dataStoreName) { datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName); datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore"; return this; @@ -454,48 +463,48 @@ public class DatastoreContext { return this; } - public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) { + public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) { datastoreContext.writeOnlyTransactionOptimizationsEnabled = value; return this; } - public Builder shardCommitQueueExpiryTimeoutInMillis(long value) { + public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) { datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value; return this; } - public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) { + public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) { datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert( value, TimeUnit.SECONDS); return this; } - public Builder transactionDebugContextEnabled(boolean value) { + public Builder transactionDebugContextEnabled(final boolean value) { datastoreContext.transactionDebugContextEnabled = value; return this; } - public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) { + public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) { this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize; return this; } - public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) { + public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) { this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize; return this; } - public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) { + public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) { this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize; return this; } - public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) { + public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) { this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize; return this; } - public Builder useTellBasedProtocol(boolean value) { + public Builder useTellBasedProtocol(final boolean value) { datastoreContext.useTellBasedProtocol = value; return this; } @@ -504,46 +513,52 @@ public class DatastoreContext { * For unit tests only. */ @VisibleForTesting - public Builder shardManagerPersistenceId(String id) { + public Builder shardManagerPersistenceId(final String id) { datastoreContext.shardManagerPersistenceId = id; return this; } - public DatastoreContext build() { - datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create( - maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, - maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize); - - if (datastoreContext.dataStoreName != null) { - GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName); - } - - return datastoreContext; - } - - public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) { + public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) { datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation); return this; } - public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) { + public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) { datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize); return this; } - public Builder shardPeerAddressResolver(PeerAddressResolver resolver) { + public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) { datastoreContext.setPeerAddressResolver(resolver); return this; } - public Builder tempFileDirectory(String tempFileDirectory) { + public Builder tempFileDirectory(final String tempFileDirectory) { datastoreContext.setTempFileDirectory(tempFileDirectory); return this; } - public Builder fileBackedStreamingThresholdInMegabytes(int fileBackedStreamingThreshold) { + public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) { datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE); return this; } + + public Builder syncIndexThreshold(final long syncIndexThreshold) { + datastoreContext.setSyncIndexThreshold(syncIndexThreshold); + return this; + } + + @Override + public DatastoreContext build() { + datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create( + maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, + maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize); + + if (datastoreContext.dataStoreName != null) { + GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName); + } + + return datastoreContext; + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index 0847c981ee..a99b8c3a98 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -20,13 +20,14 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute private BundleContext bundleContext; public DistributedConfigDataStoreProviderModule( - org.opendaylight.controller.config.api.ModuleIdentifier identifier, - org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + final org.opendaylight.controller.config.api.ModuleIdentifier identifier, + final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } - public DistributedConfigDataStoreProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver, - DistributedConfigDataStoreProviderModule oldModule, AutoCloseable oldInstance) { + public DistributedConfigDataStoreProviderModule(final ModuleIdentifier identifier, + final DependencyResolver dependencyResolver, final DistributedConfigDataStoreProviderModule oldModule, + final AutoCloseable oldInstance) { super(identifier, dependencyResolver, oldModule, oldInstance); } @@ -36,7 +37,7 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute } @Override - public boolean canReuseInstance(AbstractDistributedConfigDataStoreProviderModule oldModule) { + public boolean canReuseInstance(final AbstractDistributedConfigDataStoreProviderModule oldModule) { return true; } @@ -54,7 +55,7 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute return newDatastoreContext(null); } - private static DatastoreContext newDatastoreContext(ConfigProperties inProps) { + private static DatastoreContext newDatastoreContext(final ConfigProperties inProps) { ConfigProperties props = inProps; if (props == null) { props = new ConfigProperties(); @@ -97,10 +98,11 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation()) .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue()) .useTellBasedProtocol(props.getUseTellBasedProtocol()) + .syncIndexThreshold(props.getSyncIndexThreshold().getValue()) .build(); } - public void setBundleContext(BundleContext bundleContext) { + public void setBundleContext(final BundleContext bundleContext) { this.bundleContext = bundleContext; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 2e97c7ee9a..44146aa8bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -20,14 +20,14 @@ public class DistributedOperationalDataStoreProviderModule extends AbstractDistributedOperationalDataStoreProviderModule { private BundleContext bundleContext; - public DistributedOperationalDataStoreProviderModule(ModuleIdentifier identifier, - DependencyResolver dependencyResolver) { + public DistributedOperationalDataStoreProviderModule(final ModuleIdentifier identifier, + final DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } - public DistributedOperationalDataStoreProviderModule(ModuleIdentifier identifier, - DependencyResolver dependencyResolver,DistributedOperationalDataStoreProviderModule oldModule, - AutoCloseable oldInstance) { + public DistributedOperationalDataStoreProviderModule(final ModuleIdentifier identifier, + final DependencyResolver dependencyResolver,final DistributedOperationalDataStoreProviderModule oldModule, + final AutoCloseable oldInstance) { super(identifier, dependencyResolver, oldModule, oldInstance); } @@ -37,7 +37,7 @@ public class DistributedOperationalDataStoreProviderModule } @Override - public boolean canReuseInstance(AbstractDistributedOperationalDataStoreProviderModule oldModule) { + public boolean canReuseInstance(final AbstractDistributedOperationalDataStoreProviderModule oldModule) { return true; } @@ -55,7 +55,7 @@ public class DistributedOperationalDataStoreProviderModule return newDatastoreContext(null); } - private static DatastoreContext newDatastoreContext(OperationalProperties inProps) { + private static DatastoreContext newDatastoreContext(final OperationalProperties inProps) { OperationalProperties props = inProps; if (props == null) { props = new OperationalProperties(); @@ -98,10 +98,11 @@ public class DistributedOperationalDataStoreProviderModule .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation()) .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue()) .useTellBasedProtocol(props.getUseTellBasedProtocol()) + .syncIndexThreshold(props.getSyncIndexThreshold().getValue()) .build(); } - public void setBundleContext(BundleContext bundleContext) { + public void setBundleContext(final BundleContext bundleContext) { this.bundleContext = bundleContext; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index d14c828809..49a51a625c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -248,6 +248,14 @@ module distributed-datastore-provider { is the threshold in terms of number of megabytes before it should switch from storing in memory to buffering to a file."; } + + leaf sync-index-threshold { + default 10; + type non-zero-uint32-type; + description "Permitted synchronization lag, expressed in terms of RAFT entry count. It a follower's + commitIndex trails the leader's journal by more than this amount of entries the follower + is considered to be out-of-sync."; + } } // Augments the 'configuration' choice node under modules/module. -- 2.36.6