import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final TransactionIdentifier identifier;
private final long acquireTimeout;
private final Semaphore semaphore;
+ private final int maxPermits;
- OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
+ OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final long acquireTimeoutSeconds) {
this.identifier = Preconditions.checkNotNull(identifier);
Preconditions.checkArgument(acquireTimeoutSeconds >= 0);
this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
Preconditions.checkArgument(maxPermits >= 0);
+ this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
}
acquire(1);
}
- private void acquire(final int acquirePermits) {
+ void acquire(final int acquirePermits) {
try {
if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
}
}
- void release() {
- this.semaphore.release();
- }
-
@Override
public void onComplete(final Throwable throwable, final Object message) {
if (message instanceof BatchedModificationsReply) {
}
@VisibleForTesting
- Semaphore getSemaphore() {
- return semaphore;
+ int availablePermits() {
+ return semaphore.availablePermits();
+ }
+
+ /**
+ * Release all the permits.
+ */
+ public void releaseAll() {
+ this.semaphore.release(maxPermits - availablePermits());
}
}