X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionRateLimitingCallback.java;h=f35e30a06ccb1c8a95b4ff843ac22abedc21d168;hp=526ce59aabe370246b2742bf068d25b9befcc764;hb=925cb4a228d0fda99c7bfeb432eb25285a223887;hpb=a54ec60368110d22794602343c934902f6833c65 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java index 526ce59aab..f35e30a06c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionRateLimitingCallback.java @@ -8,47 +8,63 @@ package org.opendaylight.controller.cluster.datastore; -import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a - * transaction + * transaction. */ -public class TransactionRateLimitingCallback implements OperationCallback{ +public class TransactionRateLimitingCallback implements OperationCallback { + private static Ticker TICKER = Ticker.systemTicker(); - private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimitingCallback.class); - private static final String COMMIT = "commit"; + private enum State { + STOPPED, + RUNNING, + PAUSED + } private final Timer commitTimer; - private final ActorContext actorContext; - private Timer.Context timerContext; + private long startTime; + private long elapsedTime; + private volatile State state = State.STOPPED; - TransactionRateLimitingCallback(ActorContext actorContext){ - this.actorContext = actorContext; - commitTimer = actorContext.getOperationTimer(COMMIT); + TransactionRateLimitingCallback(ActorContext actorContext) { + commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT); } @Override public void run() { - timerContext = commitTimer.time(); + Preconditions.checkState(state == State.STOPPED, "state is not STOPPED"); + resume(); } @Override - public void success() { - Preconditions.checkState(timerContext != null, "Call run before success"); - timerContext.stop(); - - double newRateLimit = calculateNewRateLimit(commitTimer, actorContext.getDatastoreContext()); + public void pause() { + if (state == State.RUNNING) { + elapsedTime += TICKER.read() - startTime; + state = State.PAUSED; + } + } - LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit); + @Override + public void resume() { + if (state != State.RUNNING) { + startTime = TICKER.read(); + state = State.RUNNING; + } + } - actorContext.setTxCreationLimit(newRateLimit); + @Override + public void success() { + Preconditions.checkState(state != State.STOPPED, "state is STOPPED"); + pause(); + commitTimer.update(elapsedTime, TimeUnit.NANOSECONDS); + state = State.STOPPED; } @Override @@ -58,66 +74,8 @@ public class TransactionRateLimitingCallback implements OperationCallback{ // not going to be useful - so we leave it as it is } - private static double calculateNewRateLimit(Timer commitTimer, DatastoreContext context) { - if(commitTimer == null) { - // This can happen in unit tests. - return 0; - } - - Snapshot timerSnapshot = commitTimer.getSnapshot(); - double newRateLimit = 0; - - long commitTimeoutInSeconds = context.getShardTransactionCommitTimeoutInSeconds(); - long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds); - - // Find the time that it takes for transactions to get executed in every 10th percentile - // Compute the rate limit for that percentile and sum it up - for(int i=1;i<=10;i++){ - // Get the amount of time transactions take in the i*10th percentile - double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D); - - if(percentileTimeInNanos > 0) { - // Figure out the rate limit for the i*10th percentile in nanos - double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos); - - // Add the percentileRateLimit to the total rate limit - newRateLimit += percentileRateLimit; - } - } - - // Compute the rate limit per second - return newRateLimit/(commitTimeoutInSeconds*10); - } - - public static void adjustRateLimitForUnusedTransaction(ActorContext actorContext) { - // Unused transactions in one data store can artificially limit the rate for other data stores - // if the first data store's rate is still at a lower initial rate since the front-end creates - // transactions in each data store up-front even though the client may not actually submit changes. - // So we may have to adjust the rate for data stores with unused transactions. - - // First calculate the current rate for the data store. If it's 0 then there have been no - // actual transactions committed to the data store. - - double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(COMMIT), - actorContext.getDatastoreContext()); - if(newRateLimit == 0.0) { - // Since we have no rate data for unused Tx's data store, adjust to the rate from another - // data store that does have rate data. - for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) { - if(datastoreType.equals(actorContext.getDataStoreType())) { - continue; - } - - newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, COMMIT), - actorContext.getDatastoreContext()); - if(newRateLimit > 0.0) { - LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}", - actorContext.getDataStoreType(), newRateLimit); - - actorContext.setTxCreationLimit(newRateLimit); - break; - } - } - } + @VisibleForTesting + static void setTicker(Ticker ticker) { + TICKER = ticker; } -} \ No newline at end of file +}