d7cbce74a93fcb77edf8a124da8dd6023e50430c
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / TransactionRateLimiter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import com.codahale.metrics.Snapshot;
12 import com.codahale.metrics.Timer;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.util.concurrent.RateLimiter;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 public class TransactionRateLimiter {
22     private static final Logger LOG = LoggerFactory.getLogger(TransactionRateLimiter.class);
23
24     private final ActorContext actorContext;
25     private final long commitTimeoutInSeconds;
26     private final String dataStoreType;
27     private final RateLimiter txRateLimiter;
28     private final AtomicLong acquireCount = new AtomicLong();
29
30     private volatile long pollOnCount = 1;
31
32     public TransactionRateLimiter(ActorContext actorContext){
33         this.actorContext = actorContext;
34         this.commitTimeoutInSeconds = actorContext.getDatastoreContext().getShardTransactionCommitTimeoutInSeconds();
35         this.dataStoreType = actorContext.getDataStoreType();
36         this.txRateLimiter = RateLimiter.create(actorContext.getDatastoreContext().getTransactionCreationInitialRateLimit());
37     }
38
39     public void acquire(){
40         adjustRateLimit();
41         txRateLimiter.acquire();
42     }
43
44     private void adjustRateLimit() {
45         final long count = acquireCount.incrementAndGet();
46         if(count >= pollOnCount) {
47             final Timer commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
48             double newRateLimit = calculateNewRateLimit(commitTimer, commitTimeoutInSeconds);
49
50             if (newRateLimit < 1.0) {
51                 newRateLimit = getRateLimitFromOtherDataStores();
52             }
53
54             if (newRateLimit >= 1.0) {
55                 txRateLimiter.setRate(newRateLimit);
56                 pollOnCount = count + ((long) newRateLimit/2);
57             }
58         }
59     }
60
61     public double getTxCreationLimit(){
62         return txRateLimiter.getRate();
63     }
64
65     private double getRateLimitFromOtherDataStores(){
66         // Since we have no rate data for unused Tx's data store, adjust to the rate from another
67         // data store that does have rate data.
68         for(String datastoreType: DatastoreContext.getGlobalDatastoreTypes()) {
69             if(datastoreType.equals(this.dataStoreType)) {
70                 continue;
71             }
72
73             double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(datastoreType, ActorContext.COMMIT),
74                     this.commitTimeoutInSeconds);
75             if(newRateLimit > 0.0) {
76                 LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
77                         this.dataStoreType, newRateLimit);
78
79                 return newRateLimit;
80             }
81         }
82
83         return -1.0D;
84     }
85
86     private double calculateNewRateLimit(Timer commitTimer, long commitTimeoutInSeconds) {
87         if(commitTimer == null) {
88             // This can happen in unit tests.
89             return 0;
90         }
91
92         Snapshot timerSnapshot = commitTimer.getSnapshot();
93         double newRateLimit = 0;
94
95         long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
96
97         // Find the time that it takes for transactions to get executed in every 10th percentile
98         // Compute the rate limit for that percentile and sum it up
99         for(int i=1;i<=10;i++){
100             // Get the amount of time transactions take in the i*10th percentile
101             double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
102
103             if(percentileTimeInNanos > 0) {
104                 // Figure out the rate limit for the i*10th percentile in nanos
105                 double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
106
107                 // Add the percentileRateLimit to the total rate limit
108                 newRateLimit += percentileRateLimit;
109             }
110         }
111
112         // Compute the rate limit per second
113         return newRateLimit/(commitTimeoutInSeconds*10);
114     }
115
116     @VisibleForTesting
117     long getPollOnCount() {
118         return pollOnCount;
119     }
120
121     @VisibleForTesting
122     void setPollOnCount(long value){
123         pollOnCount = value;
124     }
125
126     @VisibleForTesting
127     void setAcquireCount(long value){
128         acquireCount.set(value);
129     }
130
131 }