BUG-8618: make sync threshold tuneable 40/59040/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 15 Jun 2017 01:13:47 +0000 (03:13 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Thu, 15 Jun 2017 17:12:06 +0000 (17:12 +0000)
We are observing quite a few of these transitions, which may be coming
from batching scenarios. Introduce sync-index-threshold config knob
to expose control over it.

Change-Id: Ief4c89c2fe5b95cebaf3fb83cbcdda37cac126b6
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 890e4bbf40aee318a2174bd4130cf34437e5617b)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang

index 86ce311..8d6b965 100644 (file)
@@ -131,4 +131,12 @@ public interface ConfigParams {
      * @return the threshold in terms of number of bytes.
      */
     int getFileBackedStreamingThreshold();
+
+    /**
+     * Returns the threshold in terms of number journal entries that we can lag behind a leader until we raise a
+     * 'not synced' transition.
+     *
+     * @return the threshold in terms of number of journal entries.
+     */
+    long getSyncIndexThreshold();
 }
index 56fb633..6450285 100644 (file)
@@ -49,6 +49,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public static final FiniteDuration HEART_BEAT_INTERVAL =
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
+    private final Supplier<RaftPolicy> policySupplier = Suppliers.memoize(this::getPolicy);
+
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
@@ -64,53 +66,53 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private long electionTimeoutFactor = 2;
     private String customRaftPolicyImplementationClass;
 
-    private final Supplier<RaftPolicy> policySupplier = Suppliers.memoize(new PolicySupplier());
-
     private PeerAddressResolver peerAddressResolver = NoopPeerAddressResolver.INSTANCE;
 
     private String tempFileDirectory = "";
 
     private int fileBackedStreamingThreshold = 128 * MEGABYTE;
 
-    public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
+    private long syncIndexThreshold = 10;
+
+    public void setHeartBeatInterval(final FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
         electionTimeOutInterval = null;
     }
 
-    public void setSnapshotBatchCount(long snapshotBatchCount) {
+    public void setSnapshotBatchCount(final long snapshotBatchCount) {
         this.snapshotBatchCount = snapshotBatchCount;
     }
 
-    public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage) {
+    public void setSnapshotDataThresholdPercentage(final int snapshotDataThresholdPercentage) {
         this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
     }
 
-    public void setSnapshotChunkSize(int snapshotChunkSize) {
+    public void setSnapshotChunkSize(final int snapshotChunkSize) {
         this.snapshotChunkSize = snapshotChunkSize;
     }
 
-    public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
+    public void setJournalRecoveryLogBatchSize(final int journalRecoveryLogBatchSize) {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
 
-    public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
+    public void setIsolatedLeaderCheckInterval(final FiniteDuration isolatedLeaderCheckInterval) {
         this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
     }
 
-    public void setElectionTimeoutFactor(long electionTimeoutFactor) {
+    public void setElectionTimeoutFactor(final long electionTimeoutFactor) {
         this.electionTimeoutFactor = electionTimeoutFactor;
         electionTimeOutInterval = null;
     }
 
-    public void setTempFileDirectory(String tempFileDirectory) {
+    public void setTempFileDirectory(final String tempFileDirectory) {
         this.tempFileDirectory = tempFileDirectory;
     }
 
-    public void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+    public void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
         this.fileBackedStreamingThreshold = fileBackedStreamingThreshold;
     }
 
-    public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) {
+    public void setCustomRaftPolicyImplementationClass(final String customRaftPolicyImplementationClass) {
         this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
     }
 
@@ -184,37 +186,45 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return fileBackedStreamingThreshold;
     }
 
-    private class PolicySupplier implements Supplier<RaftPolicy> {
-        @Override
-        @SuppressWarnings("checkstyle:IllegalCatch")
-        public RaftPolicy get() {
-            if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
-                LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
-                return DefaultRaftPolicy.INSTANCE;
-            }
-
-            try {
-                String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
-                LOG.info("Trying to use custom RaftPolicy {}", className);
-                return (RaftPolicy)Class.forName(className).newInstance();
-            } catch (Exception e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.error("Could not create custom raft policy, will stick with default", e);
-                } else {
-                    LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
-                            e.getMessage());
-                }
-            }
-            return DefaultRaftPolicy.INSTANCE;
-        }
-    }
 
     @Override
     public PeerAddressResolver getPeerAddressResolver() {
         return peerAddressResolver;
     }
 
-    public void setPeerAddressResolver(@Nonnull PeerAddressResolver peerAddressResolver) {
+    public void setPeerAddressResolver(@Nonnull final PeerAddressResolver peerAddressResolver) {
         this.peerAddressResolver = Preconditions.checkNotNull(peerAddressResolver);
     }
+
+    @Override
+    public long getSyncIndexThreshold() {
+        return syncIndexThreshold;
+    }
+
+    public void setSyncIndexThreshold(final long syncIndexThreshold) {
+        Preconditions.checkArgument(syncIndexThreshold >= 0);
+        this.syncIndexThreshold = syncIndexThreshold;
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private RaftPolicy getPolicy() {
+        if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
+            LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
+            return DefaultRaftPolicy.INSTANCE;
+        }
+
+        try {
+            String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
+            LOG.info("Trying to use custom RaftPolicy {}", className);
+            return (RaftPolicy)Class.forName(className).newInstance();
+        } catch (Exception e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.error("Could not create custom raft policy, will stick with default", e);
+            } else {
+                LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
+                    e.getMessage());
+            }
+        }
+        return DefaultRaftPolicy.INSTANCE;
+    }
 }
index b512089..c35de82 100644 (file)
@@ -51,8 +51,6 @@ import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
-    private static final int SYNC_THRESHOLD = 10;
-
     private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18;
 
     private final SyncStatusTracker initialSyncStatusTracker;
@@ -62,16 +60,18 @@ public class Follower extends AbstractRaftActorBehavior {
     private String leaderId;
     private short leaderPayloadVersion;
 
-    public Follower(RaftActorContext context) {
+    public Follower(final RaftActorContext context) {
         this(context, null, (short)-1);
     }
 
-    public Follower(RaftActorContext context, String initialLeaderId, short initialLeaderPayloadVersion) {
+    public Follower(final RaftActorContext context, final String initialLeaderId,
+            final short initialLeaderPayloadVersion) {
         super(context, RaftState.Follower);
         this.leaderId = initialLeaderId;
         this.leaderPayloadVersion = initialLeaderPayloadVersion;
 
-        initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
+        initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
+            .getSyncIndexThreshold());
 
         if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
             actor().tell(TimeoutNow.INSTANCE, actor());
@@ -96,7 +96,7 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    protected final void setLeaderPayloadVersion(short leaderPayloadVersion) {
+    protected final void setLeaderPayloadVersion(final short leaderPayloadVersion) {
         this.leaderPayloadVersion = leaderPayloadVersion;
     }
 
@@ -108,7 +108,7 @@ public class Follower extends AbstractRaftActorBehavior {
         lastLeaderMessageTimer.start();
     }
 
-    private boolean isLogEntryPresent(long index) {
+    private boolean isLogEntryPresent(final long index) {
         if (context.getReplicatedLog().isInSnapshot(index)) {
             return true;
         }
@@ -118,12 +118,12 @@ public class Follower extends AbstractRaftActorBehavior {
 
     }
 
-    private void updateInitialSyncStatus(long currentLeaderCommit, String newLeaderId) {
+    private void updateInitialSyncStatus(final long currentLeaderCommit, final String newLeaderId) {
         initialSyncStatusTracker.update(newLeaderId, currentLeaderCommit, context.getCommitIndex());
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
+    protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
 
         int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
         if (log.isTraceEnabled()) {
@@ -317,7 +317,7 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
-    private boolean isOutOfSync(AppendEntries appendEntries) {
+    private boolean isOutOfSync(final AppendEntries appendEntries) {
 
         long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
         boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
@@ -371,19 +371,19 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply) {
+    protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+        final AppendEntriesReply appendEntriesReply) {
         return this;
     }
 
     @Override
-    protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply) {
+    protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender,
+        final RequestVoteReply requestVoteReply) {
         return this;
     }
 
     @Override
-    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+    public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
             return handleElectionTimeout(message);
         }
@@ -419,7 +419,7 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, rpc);
     }
 
-    private RaftActorBehavior handleElectionTimeout(Object message) {
+    private RaftActorBehavior handleElectionTimeout(final Object message) {
         // If the message is ElectionTimeout, verify we haven't actually seen a message from the leader
         // during the election timeout interval. It may that the election timer expired b/c this actor
         // was busy and messages got delayed, in which case leader messages would be backed up in the
@@ -512,7 +512,7 @@ public class Follower extends AbstractRaftActorBehavior {
         return false;
     }
 
-    private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
+    private void handleInstallSnapshot(final ActorRef sender, final InstallSnapshot installSnapshot) {
 
         log.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
 
index 6d4d3b4..3ab714a 100644 (file)
@@ -84,3 +84,7 @@ operational.persistent=false
 # Enable tell-based protocol between frontend (applications) and backend (shards). Using this protocol
 # should avoid AskTimeoutExceptions seen under heavy load. Defaults to false (use ask-based protocol).
 #use-tell-based-protocol=true
+
+# Tune the maximum number of entries a follower is allowed to lag behind the leader before it is
+# considered out-of-sync. This flag may require tuning in face of a large number of small transactions.
+#sync-index-threshold=10
index eeb6ad3..02f2768 100644 (file)
@@ -11,8 +11,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.text.WordUtils;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
@@ -56,7 +56,11 @@ public class DatastoreContext {
             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
     public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
 
-    private static final Set<String> GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet();
+    public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
+
+    private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
+
+    private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
 
     private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
     private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
@@ -69,7 +73,6 @@ public class DatastoreContext {
     private boolean persistent = DEFAULT_PERSISTENT;
     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
-    private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
     private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
@@ -92,6 +95,7 @@ public class DatastoreContext {
         setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
         setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
         setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
+        setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
     }
 
     private DatastoreContext(final DatastoreContext other) {
@@ -127,13 +131,14 @@ public class DatastoreContext {
         setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
         setTempFileDirectory(other.getTempFileDirectory());
         setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
+        setSyncIndexThreshold(other.raftConfig.getSyncIndexThreshold());
     }
 
     public static Builder newBuilder() {
         return new Builder(new DatastoreContext());
     }
 
-    public static Builder newBuilderFrom(DatastoreContext context) {
+    public static Builder newBuilderFrom(final DatastoreContext context) {
         return new Builder(new DatastoreContext(context));
     }
 
@@ -209,7 +214,7 @@ public class DatastoreContext {
         return raftConfig.getTempFileDirectory();
     }
 
-    private void setTempFileDirectory(String tempFileDirectory) {
+    private void setTempFileDirectory(final String tempFileDirectory) {
         raftConfig.setTempFileDirectory(tempFileDirectory);
     }
 
@@ -217,51 +222,55 @@ public class DatastoreContext {
         return raftConfig.getFileBackedStreamingThreshold();
     }
 
-    private void setFileBackedStreamingThreshold(int fileBackedStreamingThreshold) {
+    private void setFileBackedStreamingThreshold(final int fileBackedStreamingThreshold) {
         raftConfig.setFileBackedStreamingThreshold(fileBackedStreamingThreshold);
     }
 
-    private void setPeerAddressResolver(PeerAddressResolver resolver) {
+    private void setPeerAddressResolver(final PeerAddressResolver resolver) {
         raftConfig.setPeerAddressResolver(resolver);
     }
 
-    private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) {
+    private void setHeartbeatInterval(final long shardHeartbeatIntervalInMillis) {
         raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
                 TimeUnit.MILLISECONDS));
     }
 
-    private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+    private void setShardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
         raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
     }
 
 
-    private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+    private void setIsolatedLeaderCheckInterval(final long shardIsolatedLeaderCheckIntervalInMillis) {
         raftConfig.setIsolatedLeaderCheckInterval(
                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
     }
 
-    private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+    private void setElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
         raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
     }
 
-    private void setCustomRaftPolicyImplementation(String customRaftPolicyImplementation) {
+    private void setCustomRaftPolicyImplementation(final String customRaftPolicyImplementation) {
         raftConfig.setCustomRaftPolicyImplementationClass(customRaftPolicyImplementation);
     }
 
-    private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+    private void setSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
         Preconditions.checkArgument(shardSnapshotDataThresholdPercentage >= 0
                 && shardSnapshotDataThresholdPercentage <= 100);
         raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
     }
 
-    private void setSnapshotBatchCount(long shardSnapshotBatchCount) {
+    private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
         raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
-    private void setShardSnapshotChunkSize(int shardSnapshotChunkSize) {
+    private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
         raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
     }
 
+    private void setSyncIndexThreshold(final long syncIndexThreshold) {
+        raftConfig.setSyncIndexThreshold(syncIndexThreshold);
+    }
+
     public int getShardBatchedModificationCount() {
         return shardBatchedModificationCount;
     }
@@ -286,7 +295,7 @@ public class DatastoreContext {
         return raftConfig.getSnapshotChunkSize();
     }
 
-    public static class Builder {
+    public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
         private final DatastoreContext datastoreContext;
         private int maxShardDataChangeExecutorPoolSize =
                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE;
@@ -297,7 +306,7 @@ public class DatastoreContext {
         private int maxShardDataStoreExecutorQueueSize =
                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE;
 
-        private Builder(DatastoreContext datastoreContext) {
+        private Builder(final DatastoreContext datastoreContext) {
             this.datastoreContext = datastoreContext;
 
             if (datastoreContext.getDataStoreProperties() != null) {
@@ -312,115 +321,115 @@ public class DatastoreContext {
             }
         }
 
-        public Builder boundedMailboxCapacity(int boundedMailboxCapacity) {
+        public Builder boundedMailboxCapacity(final int boundedMailboxCapacity) {
             // TODO - this is defined in the yang DataStoreProperties but not currently used.
             return this;
         }
 
-        public Builder enableMetricCapture(boolean enableMetricCapture) {
+        public Builder enableMetricCapture(final boolean enableMetricCapture) {
             // TODO - this is defined in the yang DataStoreProperties but not currently used.
             return this;
         }
 
 
-        public Builder shardTransactionIdleTimeout(long timeout, TimeUnit unit) {
+        public Builder shardTransactionIdleTimeout(final long timeout, final TimeUnit unit) {
             datastoreContext.shardTransactionIdleTimeout = Duration.create(timeout, unit);
             return this;
         }
 
-        public Builder shardTransactionIdleTimeoutInMinutes(long timeout) {
+        public Builder shardTransactionIdleTimeoutInMinutes(final long timeout) {
             return shardTransactionIdleTimeout(timeout, TimeUnit.MINUTES);
         }
 
-        public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
+        public Builder operationTimeoutInSeconds(final int operationTimeoutInSeconds) {
             datastoreContext.operationTimeoutInMillis = TimeUnit.SECONDS.toMillis(operationTimeoutInSeconds);
             return this;
         }
 
-        public Builder operationTimeoutInMillis(long operationTimeoutInMillis) {
+        public Builder operationTimeoutInMillis(final long operationTimeoutInMillis) {
             datastoreContext.operationTimeoutInMillis = operationTimeoutInMillis;
             return this;
         }
 
-        public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
+        public Builder dataStoreMXBeanType(final String dataStoreMXBeanType) {
             datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
             return this;
         }
 
-        public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
+        public Builder shardTransactionCommitTimeoutInSeconds(final int shardTransactionCommitTimeoutInSeconds) {
             datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
             return this;
         }
 
-        public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+        public Builder shardJournalRecoveryLogBatchSize(final int shardJournalRecoveryLogBatchSize) {
             datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             return this;
         }
 
-        public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
+        public Builder shardSnapshotBatchCount(final int shardSnapshotBatchCount) {
             datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
             return this;
         }
 
-        public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+        public Builder shardSnapshotDataThresholdPercentage(final int shardSnapshotDataThresholdPercentage) {
             datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             return this;
         }
 
-        public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
+        public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
             datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
             return this;
         }
 
-        public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
+        public Builder shardTransactionCommitQueueCapacity(final int shardTransactionCommitQueueCapacity) {
             datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
             return this;
         }
 
-        public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
+        public Builder shardInitializationTimeout(final long timeout, final TimeUnit unit) {
             datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
             return this;
         }
 
-        public Builder shardInitializationTimeoutInSeconds(long timeout) {
+        public Builder shardInitializationTimeoutInSeconds(final long timeout) {
             return shardInitializationTimeout(timeout, TimeUnit.SECONDS);
         }
 
-        public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
+        public Builder shardLeaderElectionTimeout(final long timeout, final TimeUnit unit) {
             datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
             return this;
         }
 
-        public Builder shardLeaderElectionTimeoutInSeconds(long timeout) {
+        public Builder shardLeaderElectionTimeoutInSeconds(final long timeout) {
             return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
         }
 
-        public Builder configurationReader(AkkaConfigurationReader configurationReader) {
+        public Builder configurationReader(final AkkaConfigurationReader configurationReader) {
             datastoreContext.configurationReader = configurationReader;
             return this;
         }
 
-        public Builder persistent(boolean persistent) {
+        public Builder persistent(final boolean persistent) {
             datastoreContext.persistent = persistent;
             return this;
         }
 
-        public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
+        public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
             return this;
         }
 
-        public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+        public Builder shardElectionTimeoutFactor(final long shardElectionTimeoutFactor) {
             datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
             return this;
         }
 
-        public Builder transactionCreationInitialRateLimit(long initialRateLimit) {
+        public Builder transactionCreationInitialRateLimit(final long initialRateLimit) {
             datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
             return this;
         }
 
-        public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) {
+        public Builder logicalStoreType(final LogicalDatastoreType logicalStoreType) {
             datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
 
             // Retain compatible naming
@@ -443,7 +452,7 @@ public class DatastoreContext {
             return this;
         }
 
-        public Builder dataStoreName(String dataStoreName) {
+        public Builder dataStoreName(final String dataStoreName) {
             datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
             return this;
@@ -454,48 +463,48 @@ public class DatastoreContext {
             return this;
         }
 
-        public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+        public Builder writeOnlyTransactionOptimizationsEnabled(final boolean value) {
             datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
             return this;
         }
 
-        public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
+        public Builder shardCommitQueueExpiryTimeoutInMillis(final long value) {
             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
             return this;
         }
 
-        public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
+        public Builder shardCommitQueueExpiryTimeoutInSeconds(final long value) {
             datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
                     value, TimeUnit.SECONDS);
             return this;
         }
 
-        public Builder transactionDebugContextEnabled(boolean value) {
+        public Builder transactionDebugContextEnabled(final boolean value) {
             datastoreContext.transactionDebugContextEnabled = value;
             return this;
         }
 
-        public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
+        public Builder maxShardDataChangeExecutorPoolSize(final int maxShardDataChangeExecutorPoolSize) {
             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
             return this;
         }
 
-        public Builder maxShardDataChangeExecutorQueueSize(int maxShardDataChangeExecutorQueueSize) {
+        public Builder maxShardDataChangeExecutorQueueSize(final int maxShardDataChangeExecutorQueueSize) {
             this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
             return this;
         }
 
-        public Builder maxShardDataChangeListenerQueueSize(int maxShardDataChangeListenerQueueSize) {
+        public Builder maxShardDataChangeListenerQueueSize(final int maxShardDataChangeListenerQueueSize) {
             this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
             return this;
         }
 
-        public Builder maxShardDataStoreExecutorQueueSize(int maxShardDataStoreExecutorQueueSize) {
+        public Builder maxShardDataStoreExecutorQueueSize(final int maxShardDataStoreExecutorQueueSize) {
             this.maxShardDataStoreExecutorQueueSize = maxShardDataStoreExecutorQueueSize;
             return this;
         }
 
-        public Builder useTellBasedProtocol(boolean value) {
+        public Builder useTellBasedProtocol(final boolean value) {
             datastoreContext.useTellBasedProtocol = value;
             return this;
         }
@@ -504,46 +513,52 @@ public class DatastoreContext {
          * For unit tests only.
          */
         @VisibleForTesting
-        public Builder shardManagerPersistenceId(String id) {
+        public Builder shardManagerPersistenceId(final String id) {
             datastoreContext.shardManagerPersistenceId = id;
             return this;
         }
 
-        public DatastoreContext build() {
-            datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
-                    maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
-                    maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
-
-            if (datastoreContext.dataStoreName != null) {
-                GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
-            }
-
-            return datastoreContext;
-        }
-
-        public Builder customRaftPolicyImplementation(String customRaftPolicyImplementation) {
+        public Builder customRaftPolicyImplementation(final String customRaftPolicyImplementation) {
             datastoreContext.setCustomRaftPolicyImplementation(customRaftPolicyImplementation);
             return this;
         }
 
-        public Builder shardSnapshotChunkSize(int shardSnapshotChunkSize) {
+        public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
             datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
             return this;
         }
 
-        public Builder shardPeerAddressResolver(PeerAddressResolver resolver) {
+        public Builder shardPeerAddressResolver(final PeerAddressResolver resolver) {
             datastoreContext.setPeerAddressResolver(resolver);
             return this;
         }
 
-        public Builder tempFileDirectory(String tempFileDirectory) {
+        public Builder tempFileDirectory(final String tempFileDirectory) {
             datastoreContext.setTempFileDirectory(tempFileDirectory);
             return this;
         }
 
-        public Builder fileBackedStreamingThresholdInMegabytes(int  fileBackedStreamingThreshold) {
+        public Builder fileBackedStreamingThresholdInMegabytes(final int  fileBackedStreamingThreshold) {
             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
             return this;
         }
+
+        public Builder syncIndexThreshold(final long syncIndexThreshold) {
+            datastoreContext.setSyncIndexThreshold(syncIndexThreshold);
+            return this;
+        }
+
+        @Override
+        public DatastoreContext build() {
+            datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
+                    maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
+                    maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
+
+            if (datastoreContext.dataStoreName != null) {
+                GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
+            }
+
+            return datastoreContext;
+        }
     }
 }
index 0847c98..a99b8c3 100644 (file)
@@ -20,13 +20,14 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute
     private BundleContext bundleContext;
 
     public DistributedConfigDataStoreProviderModule(
-        org.opendaylight.controller.config.api.ModuleIdentifier identifier,
-        org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+        final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+        final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public DistributedConfigDataStoreProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver,
-            DistributedConfigDataStoreProviderModule oldModule, AutoCloseable oldInstance) {
+    public DistributedConfigDataStoreProviderModule(final ModuleIdentifier identifier,
+            final DependencyResolver dependencyResolver, final DistributedConfigDataStoreProviderModule oldModule,
+            final AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
@@ -36,7 +37,7 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute
     }
 
     @Override
-    public boolean canReuseInstance(AbstractDistributedConfigDataStoreProviderModule oldModule) {
+    public boolean canReuseInstance(final AbstractDistributedConfigDataStoreProviderModule oldModule) {
         return true;
     }
 
@@ -54,7 +55,7 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute
         return newDatastoreContext(null);
     }
 
-    private static DatastoreContext newDatastoreContext(ConfigProperties inProps) {
+    private static DatastoreContext newDatastoreContext(final ConfigProperties inProps) {
         ConfigProperties props = inProps;
         if (props == null) {
             props = new ConfigProperties();
@@ -97,10 +98,11 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute
                 .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
                 .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
                 .useTellBasedProtocol(props.getUseTellBasedProtocol())
+                .syncIndexThreshold(props.getSyncIndexThreshold().getValue())
                 .build();
     }
 
-    public void setBundleContext(BundleContext bundleContext) {
+    public void setBundleContext(final BundleContext bundleContext) {
         this.bundleContext = bundleContext;
     }
 }
index 2e97c7e..44146aa 100644 (file)
@@ -20,14 +20,14 @@ public class DistributedOperationalDataStoreProviderModule
         extends AbstractDistributedOperationalDataStoreProviderModule {
     private BundleContext bundleContext;
 
-    public DistributedOperationalDataStoreProviderModule(ModuleIdentifier identifier,
-            DependencyResolver dependencyResolver) {
+    public DistributedOperationalDataStoreProviderModule(final ModuleIdentifier identifier,
+            final DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public DistributedOperationalDataStoreProviderModule(ModuleIdentifier identifier,
-            DependencyResolver dependencyResolver,DistributedOperationalDataStoreProviderModule oldModule,
-            AutoCloseable oldInstance) {
+    public DistributedOperationalDataStoreProviderModule(final ModuleIdentifier identifier,
+            final DependencyResolver dependencyResolver,final DistributedOperationalDataStoreProviderModule oldModule,
+            final AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
@@ -37,7 +37,7 @@ public class DistributedOperationalDataStoreProviderModule
     }
 
     @Override
-    public boolean canReuseInstance(AbstractDistributedOperationalDataStoreProviderModule oldModule) {
+    public boolean canReuseInstance(final AbstractDistributedOperationalDataStoreProviderModule oldModule) {
         return true;
     }
 
@@ -55,7 +55,7 @@ public class DistributedOperationalDataStoreProviderModule
         return newDatastoreContext(null);
     }
 
-    private static DatastoreContext newDatastoreContext(OperationalProperties inProps) {
+    private static DatastoreContext newDatastoreContext(final OperationalProperties inProps) {
         OperationalProperties props = inProps;
         if (props == null) {
             props = new OperationalProperties();
@@ -98,10 +98,11 @@ public class DistributedOperationalDataStoreProviderModule
                 .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
                 .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
                 .useTellBasedProtocol(props.getUseTellBasedProtocol())
+                .syncIndexThreshold(props.getSyncIndexThreshold().getValue())
                 .build();
     }
 
-    public void setBundleContext(BundleContext bundleContext) {
+    public void setBundleContext(final BundleContext bundleContext) {
         this.bundleContext = bundleContext;
     }
 
index d14c828..49a51a6 100644 (file)
@@ -248,6 +248,14 @@ module distributed-datastore-provider {
                 is the threshold in terms of number of megabytes before it should switch from storing in memory to
                 buffering to a file.";
         }
+
+        leaf sync-index-threshold {
+            default 10;
+            type non-zero-uint32-type;
+            description "Permitted synchronization lag, expressed in terms of RAFT entry count. It a follower's
+                         commitIndex trails the leader's journal by more than this amount of entries the follower
+                         is considered to be out-of-sync.";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.