BUG 2762 : Compute rate limit based on a more lenient algorithm
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
index c51ea80726e54d9cf656e193ca8521d242206c76..c479da73127760977d4c27b3ff9873d10c295c57 100644 (file)
@@ -11,15 +11,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -71,7 +68,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private Future<Void> buildCohortList() {
 
         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
-                actorContext.getActorSystem().dispatcher());
+                actorContext.getClientDispatcher());
 
         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
             @Override
@@ -83,7 +80,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
                 return null;
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -111,7 +108,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                     finishCanCommit(returnFuture);
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
 
         return returnFuture;
     }
@@ -158,7 +155,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
                 returnFuture.set(Boolean.valueOf(result));
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
@@ -170,7 +167,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
         }
 
-        return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
+        return Futures.sequence(futureList, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -195,7 +192,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     @Override
     public ListenableFuture<Void> commit() {
         OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
-                new CommitCallback(actorContext);
+                new TransactionRateLimitingCallback(actorContext);
 
         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
@@ -239,7 +236,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                                 propagateException, returnFuture, callback);
                     }
                 }
-            }, actorContext.getActorSystem().dispatcher());
+            }, actorContext.getClientDispatcher());
         }
 
         return returnFuture;
@@ -304,65 +301,11 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                     callback.success();
                 }
             }
-        }, actorContext.getActorSystem().dispatcher());
+        }, actorContext.getClientDispatcher());
     }
 
     @VisibleForTesting
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }
-
-    private static interface OperationCallback {
-        void run();
-        void success();
-        void failure();
-    }
-
-    private static class CommitCallback implements OperationCallback{
-
-        private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
-        private static final String COMMIT = "commit";
-
-        private final Timer commitTimer;
-        private final ActorContext actorContext;
-        private Timer.Context timerContext;
-
-        CommitCallback(ActorContext actorContext){
-            this.actorContext = actorContext;
-            commitTimer = actorContext.getOperationTimer(COMMIT);
-        }
-
-        @Override
-        public void run() {
-            timerContext = commitTimer.time();
-        }
-
-        @Override
-        public void success() {
-            timerContext.stop();
-
-            Snapshot timerSnapshot = commitTimer.getSnapshot();
-            double allowedLatencyInNanos = timerSnapshot.get95thPercentile();
-
-            long commitTimeoutInSeconds = actorContext.getDatastoreContext()
-                    .getShardTransactionCommitTimeoutInSeconds();
-            long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
-
-            // Here we are trying to find out how many transactions per second are allowed
-            double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
-
-            LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
-                    actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
-
-            actorContext.setTxCreationLimit(newRateLimit);
-        }
-
-        @Override
-        public void failure() {
-            // This would mean we couldn't get a transaction completed in 30 seconds which is
-            // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
-            // not going to be useful - so we leave it as it is
-        }
-    }
-
 }