Preconditions.checkState(timerContext != null, "Call run before success");
timerContext.stop();
+ double newRateLimit = calculateNewRateLimit(commitTimer, actorContext.getDatastoreContext());
+
+ LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ }
+
+ @Override
+ public void failure() {
+ // This would mean we couldn't get a transaction completed in 30 seconds which is
+ // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+ // not going to be useful - so we leave it as it is
+ }
+
+ private static double calculateNewRateLimit(Timer commitTimer, DatastoreContext context) {
+ if(commitTimer == null) {
+ // This can happen in unit tests.
+ return 0;
+ }
+
Snapshot timerSnapshot = commitTimer.getSnapshot();
double newRateLimit = 0;
- long commitTimeoutInSeconds = actorContext.getDatastoreContext()
- .getShardTransactionCommitTimeoutInSeconds();
+ long commitTimeoutInSeconds = context.getShardTransactionCommitTimeoutInSeconds();
long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
// Find the time that it takes for transactions to get executed in every 10th percentile
if(percentileTimeInNanos > 0) {
// Figure out the rate limit for the i*10th percentile in nanos
- double percentileRateLimit = ((double) commitTimeoutInNanos / percentileTimeInNanos);
+ double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
// Add the percentileRateLimit to the total rate limit
newRateLimit += percentileRateLimit;
}
// Compute the rate limit per second
- newRateLimit = newRateLimit/(commitTimeoutInSeconds*10);
-
- LOG.debug("Data Store {} commit rateLimit adjusted to {}", actorContext.getDataStoreType(), newRateLimit);
-
- actorContext.setTxCreationLimit(newRateLimit);
+ return newRateLimit/(commitTimeoutInSeconds*10);
}
- @Override
- public void failure() {
- // This would mean we couldn't get a transaction completed in 30 seconds which is
- // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
- // not going to be useful - so we leave it as it is
+ public static void adjustRateLimitForUnusedTransaction(ActorContext actorContext) {
+ // Unused transactions in one data store can artificially limit the rate for other data stores
+ // if the first data store's rate is still at a lower initial rate since the front-end creates
+ // transactions in each data store up-front even though the client may not actually submit changes.
+ // So we may have to adjust the rate for data stores with unused transactions.
+
+ // First calculate the current rate for the data store. If it's 0 then there have been no
+ // actual transactions committed to the data store.
+
+ double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(COMMIT),
+ actorContext.getDatastoreContext());
+ if(newRateLimit == 0.0) {
+ // Since we have no rate data for unused Tx's data store, adjust to the rate from another
+ // data store that does have rate data.
+ for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) {
+ if(datastoreType.equals(actorContext.getDataStoreType())) {
+ continue;
+ }
+
+ newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, COMMIT),
+ actorContext.getDatastoreContext());
+ if(newRateLimit > 0.0) {
+ LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
+ actorContext.getDataStoreType(), newRateLimit);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ break;
+ }
+ }
+ }
}
}
\ No newline at end of file