import scala.concurrent.duration.FiniteDuration;
/**
- * Configuration Parameter interface for configuring the Raft consensus system
- *
- * <p>
- * Any component using this implementation might want to provide an implementation of
- * this interface to configure
- *
- * <p>
- * A default implementation will be used if none is provided.
+ * Configuration Parameter interface for configuring the Raft consensus system. Any component using this implementation
+ * might want to provide an implementation of this interface to configure. A default implementation will be used if none
+ * is provided.
*
* @author Kamal Rameshan
*/
/**
* Returns the percentage of total memory used in the in-memory Raft log before a snapshot should be taken.
+ * Disabled when direct threshold is enabled.
*
* @return the percentage.
*/
int getSnapshotDataThresholdPercentage();
+ /**
+ * Returns the max size of memory used in the in-memory Raft log before a snapshot should be taken. 0 means that
+ * direct threshold is disabled and percentage is used instead.
+ *
+ * @return maximum journal size (in MiB).
+ */
+ int getSnapshotDataThreshold();
/**
- * Returns the interval(in seconds) after which a snapshot should be taken during recovery.
- * Negative value means don't take snapshots.
+ * Returns the interval(in seconds) after which a snapshot should be taken during recovery. Negative value means
+ * do not take snapshots.
*
* @return the interval of recovery snapshot in seconds
*/
*/
long getIsolatedCheckIntervalInMillis();
-
/**
* Returns the multiplication factor to be used to determine the shard election timeout. The election timeout
* is determined by multiplying the election timeout factor with the heart beat duration.
*/
long getElectionTimeoutFactor();
-
/**
* Returns the RaftPolicy used to determine certain Raft behaviors.
*
// in-memory journal can use before it needs to snapshot
private int snapshotDataThresholdPercentage = 12;
+ // max size of in-memory journal in MB
+ // 0 means direct threshold if disabled
+ private int snapshotDataThreshold = 0;
+
private int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
private long electionTimeoutFactor = 2;
this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
}
+ public void setSnapshotDataThreshold(final int snapshotDataThreshold) {
+ this.snapshotDataThreshold = snapshotDataThreshold;
+ }
+
public void setSnapshotChunkSize(final int snapshotChunkSize) {
this.snapshotChunkSize = snapshotChunkSize;
}
return snapshotDataThresholdPercentage;
}
+ @Override
+ public int getSnapshotDataThreshold() {
+ return snapshotDataThreshold;
+ }
+
@Override
public int getRecoverySnapshotIntervalSeconds() {
return this.recoverySnapshotIntervalSeconds;
@Override
public boolean shouldCaptureSnapshot(final long logIndex) {
final ConfigParams config = context.getConfigParams();
- final long journalSize = logIndex + 1;
- final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
+ if ((logIndex + 1) % config.getSnapshotBatchCount() == 0) {
+ return true;
+ }
- return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold;
+ final long absoluteThreshold = config.getSnapshotDataThreshold();
+ final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+ : context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
+ return getDataSizeForSnapshotCheck() > dataThreshold;
}
@Override
log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
- long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold;
+ final ConfigParams config = context.getConfigParams();
+ final long absoluteThreshold = config.getSnapshotDataThreshold();
+ final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+ : totalMemory * config.getSnapshotDataThresholdPercentage() / 100;
- boolean logSizeExceededSnapshotBatchCount =
- context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
+ final boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold;
+ final boolean logSizeExceededSnapshotBatchCount =
+ context.getReplicatedLog().size() >= config.getSnapshotBatchCount();
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || captureSnapshot.isMandatoryTrim()) {
} else if (logSizeExceededSnapshotBatchCount) {
log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
+ "index {}", context.getId(), context.getReplicatedLog().size(),
- context.getConfigParams().getSnapshotBatchCount(),
- captureSnapshot.getLastAppliedIndex());
+ config.getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex());
} else {
log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to "
+ "last applied index {}", context.getId(), captureSnapshot.getLastAppliedIndex());
#shard-snapshot-batch-count=20000
# The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken.
+# Disabled, if direct threshold is enabled.
#shard-snapshot-data-threshold-percentage=12
+# The max size of in-memory journal(in MB), after reaching the limit, snapshot will be taken. Should be not less then 1.
+# If set to 0, direct threshold is disabled and percentage is used instead.
+#shard-snapshot-data-threshold=0
+
# The interval at which the leader of the shard will check if its majority followers are active and
# term itself as isolated.
#shard-isolated-leader-check-interval-in-millis=5000
public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
+ public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD = 0;
public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
public static final int DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR = 1;
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
+ setSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD);
setElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR);
setCandidateElectionTimeoutDivisor(DEFAULT_SHARD_CANDIDATE_ELECTION_TIMEOUT_DIVISOR);
setSyncIndexThreshold(DEFAULT_SYNC_INDEX_THRESHOLD);
setHeartbeatInterval(other.raftConfig.getHeartBeatInterval().toMillis());
setIsolatedLeaderCheckInterval(other.raftConfig.getIsolatedCheckIntervalInMillis());
setSnapshotDataThresholdPercentage(other.raftConfig.getSnapshotDataThresholdPercentage());
+ setSnapshotDataThreshold(other.raftConfig.getSnapshotDataThreshold());
setElectionTimeoutFactor(other.raftConfig.getElectionTimeoutFactor());
setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
}
+ private void setSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
+ checkArgument(shardSnapshotDataThreshold >= 0);
+ raftConfig.setSnapshotDataThreshold(shardSnapshotDataThreshold);
+ }
+
private void setSnapshotBatchCount(final long shardSnapshotBatchCount) {
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
return this;
}
+ public Builder shardSnapshotDataThreshold(final int shardSnapshotDataThreshold) {
+ datastoreContext.setSnapshotDataThreshold(shardSnapshotDataThreshold);
+ return this;
+ }
+
public Builder shardHeartbeatIntervalInMillis(final int shardHeartbeatIntervalInMillis) {
datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
return this;
int getShardSnapshotDataThresholdPercentage();
+ int getShardSnapshotDataThreshold();
+
long getShardSnapshotBatchCount();
long getShardTransactionCommitTimeoutInSeconds();
return context.getShardRaftConfig().getSnapshotDataThresholdPercentage();
}
+ @Override
+ public int getShardSnapshotDataThreshold() {
+ return context.getShardRaftConfig().getSnapshotDataThreshold();
+ }
+
@Override
public long getShardSnapshotBatchCount() {
return context.getShardRaftConfig().getSnapshotBatchCount();
leaf shard-snapshot-data-threshold-percentage {
default 12;
type percentage;
- description "The percentage of Runtime.maxMemory() used by the in-memory journal log before a snapshot is to be taken";
+ description "The percentage of Runtime.maxMemory() used by the in-memory journal log before a snapshot is to be taken.
+ Disabled, if direct threshold is enabled.";
+ }
+
+ leaf shard-snapshot-data-threshold {
+ default 0;
+ type uint32 {
+ range "0..max";
+ }
+ description "The threshold of in-memory journal size before a snapshot is to be taken. If set to 0, direct threshold
+ is disabled and percentage is used instead.";
}
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
properties.put("recovery-snapshot-interval-seconds", "360");
properties.put("shard-isolated-leader-check-interval-in-millis", "123");
properties.put("shard-snapshot-data-threshold-percentage", "100");
+ properties.put("shard-snapshot-data-threshold", "800");
properties.put("shard-election-timeout-factor", "21");
properties.put("shard-batched-modification-count", "901");
properties.put("transactionCreationInitialRateLimit", "200");
assertEquals(360, context.getShardRaftConfig().getRecoverySnapshotIntervalSeconds());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(800, context.getShardRaftConfig().getSnapshotDataThreshold());
assertEquals(21, context.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(901, context.getShardBatchedModificationCount());
assertEquals(200, context.getTransactionCreationInitialRateLimit());
assertEquals(6, context.getInitialSettleTimeoutMultiplier());
assertEquals(123, context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(100, context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(800, context.getShardRaftConfig().getSnapshotDataThreshold());
assertEquals(22, context.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(200, context.getTransactionCreationInitialRateLimit());
assertTrue(context.isPersistent());
properties.put("shard-heartbeat-interval-in-millis", "99"); // bad - must be >= 100
properties.put("shard-transaction-commit-queue-capacity", "567"); // good
properties.put("shard-snapshot-data-threshold-percentage", "101"); // bad - must be 0-100
+ properties.put("shard-snapshot-data-threshold", "-1"); // bad - must be > 0
properties.put("shard-initialization-timeout-in-seconds", "-1"); // bad - must be > 0
properties.put("max-shard-data-change-executor-pool-size", "bogus"); // bad - NaN
properties.put("unknownProperty", "1"); // bad - invalid property name
assertEquals(567, context.getShardTransactionCommitQueueCapacity());
assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE,
context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD, context.getShardRaftConfig().getSnapshotDataThreshold());
assertEquals(DEFAULT_SHARD_INITIALIZATION_TIMEOUT, context.getShardInitializationTimeout());
}
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE,
context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD,
+ context.getShardRaftConfig().getSnapshotDataThreshold());
assertEquals(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, context.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, context.getTransactionCreationInitialRateLimit());
assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT,
builder.persistent(!DEFAULT_PERSISTENT);
builder.shardIsolatedLeaderCheckIntervalInMillis(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS + 1);
builder.shardSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1);
+ builder.shardSnapshotDataThreshold(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD + 1);
builder.shardElectionTimeoutFactor(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR + 1);
builder.transactionCreationInitialRateLimit(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT + 1);
builder.shardBatchedModificationCount(DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT + 1);
context.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE + 1,
context.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD + 1,
+ context.getShardRaftConfig().getSnapshotDataThreshold());
assertEquals(DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR + 1,
context.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT + 1, context.getTransactionCreationInitialRateLimit());