+ List<Future<ActorSelection>> getCohortFutures() {
+ return Collections.unmodifiableList(cohortFutures);
+ }
+
+ private static interface OperationCallback {
+ void run();
+ void success();
+ void failure();
+ }
+
+ private static class CommitCallback implements OperationCallback{
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
+ private static final String COMMIT = "commit";
+
+ private final Timer commitTimer;
+ private final ActorContext actorContext;
+ private Timer.Context timerContext;
+
+ CommitCallback(ActorContext actorContext){
+ this.actorContext = actorContext;
+ commitTimer = actorContext.getOperationTimer(COMMIT);
+ }
+
+ @Override
+ public void run() {
+ timerContext = commitTimer.time();
+ }
+
+ @Override
+ public void success() {
+ timerContext.stop();
+
+ Snapshot timerSnapshot = commitTimer.getSnapshot();
+ double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+
+ long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+ .getShardTransactionCommitTimeoutInSeconds();
+ long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+ // Here we are trying to find out how many transactions per second are allowed
+ double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
+
+ LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
+ actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ }
+
+ @Override
+ public void failure() {
+ // This would mean we couldn't get a transaction completed in 30 seconds which is
+ // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+ // not going to be useful - so we leave it as it is
+ }