-
- private static interface OperationCallback {
- void run();
- void success();
- void failure();
- }
-
- private static class CommitCallback implements OperationCallback{
-
- private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
- private static final String COMMIT = "commit";
-
- private final Timer commitTimer;
- private final ActorContext actorContext;
- private Timer.Context timerContext;
-
- CommitCallback(ActorContext actorContext){
- this.actorContext = actorContext;
- commitTimer = actorContext.getOperationTimer(COMMIT);
- }
-
- @Override
- public void run() {
- timerContext = commitTimer.time();
- }
-
- @Override
- public void success() {
- timerContext.stop();
-
- Snapshot timerSnapshot = commitTimer.getSnapshot();
- double allowedLatencyInNanos = timerSnapshot.get95thPercentile();
-
- long commitTimeoutInSeconds = actorContext.getDatastoreContext()
- .getShardTransactionCommitTimeoutInSeconds();
- long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
-
- // Here we are trying to find out how many transactions per second are allowed
- double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
-
- LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
- actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
-
- actorContext.setTxCreationLimit(newRateLimit);
- }
-
- @Override
- public void failure() {
- // This would mean we couldn't get a transaction completed in 30 seconds which is
- // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
- // not going to be useful - so we leave it as it is
- }
- }
-