BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / OperationLimiter.java
index b42230971b2fe276ea5bc8d39f25a3d2f455de83..34a7ebf8f233f8ab3d14c57e3e63b17357035028 100644 (file)
@@ -26,6 +26,7 @@ public class OperationLimiter extends OnComplete<Object> {
     private final TransactionIdentifier identifier;
     private final long acquireTimeout;
     private final Semaphore semaphore;
+    private final int maxPermits;
 
     OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -34,6 +35,7 @@ public class OperationLimiter extends OnComplete<Object> {
         this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
 
         Preconditions.checkArgument(maxPermits >= 0);
+        this.maxPermits = maxPermits;
         this.semaphore = new Semaphore(maxPermits);
     }
 
@@ -41,7 +43,7 @@ public class OperationLimiter extends OnComplete<Object> {
         acquire(1);
     }
 
-    private void acquire(final int acquirePermits) {
+    void acquire(final int acquirePermits) {
         try {
             if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
                 LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
@@ -55,10 +57,6 @@ public class OperationLimiter extends OnComplete<Object> {
         }
     }
 
-    void release() {
-        this.semaphore.release();
-    }
-
     @Override
     public void onComplete(final Throwable throwable, final Object message) {
         if (message instanceof BatchedModificationsReply) {
@@ -73,7 +71,14 @@ public class OperationLimiter extends OnComplete<Object> {
     }
 
     @VisibleForTesting
-    Semaphore getSemaphore() {
-        return semaphore;
+    int availablePermits(){
+        return semaphore.availablePermits();
+    }
+
+    /**
+     * Release all the permits
+     */
+    public void releaseAll() {
+        this.semaphore.release(maxPermits-availablePermits());
     }
 }