package org.opendaylight.controller.cluster.datastore;
-import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
/**
* TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a
- * transaction
+ * transaction.
*/
-public class TransactionRateLimitingCallback implements OperationCallback{
+public class TransactionRateLimitingCallback implements OperationCallback {
+ private static Ticker TICKER = Ticker.systemTicker();
- private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimitingCallback.class);
- private static final String COMMIT = "commit";
+ private enum State {
+ STOPPED,
+ RUNNING,
+ PAUSED
+ }
private final Timer commitTimer;
- private final ActorContext actorContext;
- private Timer.Context timerContext;
+ private long startTime;
+ private long elapsedTime;
+ private volatile State state = State.STOPPED;
- TransactionRateLimitingCallback(ActorContext actorContext){
- this.actorContext = actorContext;
- commitTimer = actorContext.getOperationTimer(COMMIT);
+ TransactionRateLimitingCallback(ActorUtils actorUtils) {
+ commitTimer = actorUtils.getOperationTimer(ActorUtils.COMMIT);
}
@Override
public void run() {
- timerContext = commitTimer.time();
+ Preconditions.checkState(state == State.STOPPED, "state is not STOPPED");
+ resume();
}
@Override
- public void success() {
- Preconditions.checkState(timerContext != null, "Call run before success");
- timerContext.stop();
-
- double newRateLimit = calculateNewRateLimit(commitTimer, actorContext.getDatastoreContext());
+ public void pause() {
+ if (state == State.RUNNING) {
+ elapsedTime += TICKER.read() - startTime;
+ state = State.PAUSED;
+ }
+ }
- LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
+ @Override
+ public void resume() {
+ if (state != State.RUNNING) {
+ startTime = TICKER.read();
+ state = State.RUNNING;
+ }
+ }
- actorContext.setTxCreationLimit(newRateLimit);
+ @Override
+ public void success() {
+ Preconditions.checkState(state != State.STOPPED, "state is STOPPED");
+ pause();
+ commitTimer.update(elapsedTime, TimeUnit.NANOSECONDS);
+ state = State.STOPPED;
}
@Override
// not going to be useful - so we leave it as it is
}
- private static double calculateNewRateLimit(Timer commitTimer, DatastoreContext context) {
- if(commitTimer == null) {
- // This can happen in unit tests.
- return 0;
- }
-
- Snapshot timerSnapshot = commitTimer.getSnapshot();
- double newRateLimit = 0;
-
- long commitTimeoutInSeconds = context.getShardTransactionCommitTimeoutInSeconds();
- long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
-
- // Find the time that it takes for transactions to get executed in every 10th percentile
- // Compute the rate limit for that percentile and sum it up
- for(int i=1;i<=10;i++){
- // Get the amount of time transactions take in the i*10th percentile
- double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
-
- if(percentileTimeInNanos > 0) {
- // Figure out the rate limit for the i*10th percentile in nanos
- double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
-
- // Add the percentileRateLimit to the total rate limit
- newRateLimit += percentileRateLimit;
- }
- }
-
- // Compute the rate limit per second
- return newRateLimit/(commitTimeoutInSeconds*10);
- }
-
- public static void adjustRateLimitForUnusedTransaction(ActorContext actorContext) {
- // Unused transactions in one data store can artificially limit the rate for other data stores
- // if the first data store's rate is still at a lower initial rate since the front-end creates
- // transactions in each data store up-front even though the client may not actually submit changes.
- // So we may have to adjust the rate for data stores with unused transactions.
-
- // First calculate the current rate for the data store. If it's 0 then there have been no
- // actual transactions committed to the data store.
-
- double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(COMMIT),
- actorContext.getDatastoreContext());
- if(newRateLimit == 0.0) {
- // Since we have no rate data for unused Tx's data store, adjust to the rate from another
- // data store that does have rate data.
- for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) {
- if(datastoreType.equals(actorContext.getDataStoreType())) {
- continue;
- }
-
- newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, COMMIT),
- actorContext.getDatastoreContext());
- if(newRateLimit > 0.0) {
- LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
- actorContext.getDataStoreType(), newRateLimit);
-
- actorContext.setTxCreationLimit(newRateLimit);
- break;
- }
- }
- }
+ @VisibleForTesting
+ static void setTicker(Ticker ticker) {
+ TICKER = ticker;
}
-}
\ No newline at end of file
+}