X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxy.java;h=c479da73127760977d4c27b3ff9873d10c295c57;hp=4f472266c1f56acbd8fc531ae7189f1ae91951b4;hb=61722bffcf885c04d11f684f2f03b09fa96f2002;hpb=4b207b5356775c4b4d231ae979f9f2134f617dd1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 4f472266c1..c479da7312 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -11,15 +11,12 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.Futures; import akka.dispatch.OnComplete; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; @@ -71,7 +68,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho private Future buildCohortList() { Future> combinedFutures = Futures.sequence(cohortFutures, - actorContext.getActorSystem().dispatcher()); + actorContext.getClientDispatcher()); return combinedFutures.transform(new AbstractFunction1, Void>() { @Override @@ -83,7 +80,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } return null; } - }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher()); + }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } @Override @@ -111,7 +108,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho finishCanCommit(returnFuture); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); return returnFuture; } @@ -158,7 +155,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } returnFuture.set(Boolean.valueOf(result)); } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } private Future> invokeCohorts(Object message) { @@ -170,7 +167,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout())); } - return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); + return Futures.sequence(futureList, actorContext.getClientDispatcher()); } @Override @@ -195,7 +192,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho @Override public ListenableFuture commit() { OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK : - new CommitCallback(actorContext); + new TransactionRateLimitingCallback(actorContext); return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback); @@ -239,7 +236,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho propagateException, returnFuture, callback); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } return returnFuture; @@ -304,65 +301,11 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho callback.success(); } } - }, actorContext.getActorSystem().dispatcher()); + }, actorContext.getClientDispatcher()); } @VisibleForTesting List> getCohortFutures() { return Collections.unmodifiableList(cohortFutures); } - - private static interface OperationCallback { - void run(); - void success(); - void failure(); - } - - private static class CommitCallback implements OperationCallback{ - - private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class); - private static final String COMMIT = "commit"; - - private final Timer commitTimer; - private final ActorContext actorContext; - private Timer.Context timerContext; - - CommitCallback(ActorContext actorContext){ - this.actorContext = actorContext; - commitTimer = actorContext.getOperationTimer(COMMIT); - } - - @Override - public void run() { - timerContext = commitTimer.time(); - } - - @Override - public void success() { - timerContext.stop(); - - Snapshot timerSnapshot = commitTimer.getSnapshot(); - double allowedLatencyInNanos = timerSnapshot.get98thPercentile(); - - long commitTimeoutInSeconds = actorContext.getDatastoreContext() - .getShardTransactionCommitTimeoutInSeconds(); - long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds); - - // Here we are trying to find out how many transactions per second are allowed - double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds; - - LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}", - actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos); - - actorContext.setTxCreationLimit(newRateLimit); - } - - @Override - public void failure() { - // This would mean we couldn't get a transaction completed in 30 seconds which is - // the default transaction commit timeout. Using the timeout information to figure out the rate limit is - // not going to be useful - so we leave it as it is - } - } - }