private final TransactionIdentifier identifier;
private final long acquireTimeout;
private final Semaphore semaphore;
+ private final int maxPermits;
- OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
+ OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final long acquireTimeoutSeconds) {
this.identifier = Preconditions.checkNotNull(identifier);
Preconditions.checkArgument(acquireTimeoutSeconds >= 0);
this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
Preconditions.checkArgument(maxPermits >= 0);
+ this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
}
acquire(1);
}
- private void acquire(final int acquirePermits) {
+ void acquire(final int acquirePermits) {
try {
if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
}
}
- void release() {
- this.semaphore.release();
- }
-
@Override
public void onComplete(final Throwable throwable, final Object message) {
if (message instanceof BatchedModificationsReply) {
}
@VisibleForTesting
- Semaphore getSemaphore() {
- return semaphore;
+ int availablePermits(){
+ return semaphore.availablePermits();
+ }
+
+ /**
+ * Release all the permits
+ */
+ public void releaseAll() {
+ this.semaphore.release(maxPermits-availablePermits());
}
}