X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FOperationLimiter.java;h=3f0c98c47ff218bc105be34a8b9d2cab10ab8a88;hp=ad26d0a50b4d19dccc3afee4086ea6fb16c86726;hb=c032937639e5bc223deb8ec60fab9315c16edd16;hpb=c796596b5c46b5203c30b143e6282662e66c5642 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java index ad26d0a50b..3f0c98c47f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java @@ -7,21 +7,20 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.dispatch.OnComplete; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly - * into akka to release permits as futures complete. + * Class for limiting operations. */ -public class OperationLimiter extends OnComplete { +public class OperationLimiter { private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class); private final TransactionIdentifier identifier; private final long acquireTimeout; @@ -29,24 +28,24 @@ public class OperationLimiter extends OnComplete { private final int maxPermits; OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final long acquireTimeoutSeconds) { - this.identifier = Preconditions.checkNotNull(identifier); + this.identifier = requireNonNull(identifier); - Preconditions.checkArgument(acquireTimeoutSeconds >= 0); + checkArgument(acquireTimeoutSeconds >= 0); this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds); - Preconditions.checkArgument(maxPermits >= 0); + checkArgument(maxPermits >= 0); this.maxPermits = maxPermits; this.semaphore = new Semaphore(maxPermits); } - void acquire() { - acquire(1); + boolean acquire() { + return acquire(1); } - void acquire(final int acquirePermits) { + boolean acquire(final int acquirePermits) { try { - if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) { - LOG.warn("Failed to acquire operation permit for transaction {}", identifier); + if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) { + return true; } } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { @@ -55,30 +54,32 @@ public class OperationLimiter extends OnComplete { LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier); } } + + return false; } - @Override - public void onComplete(final Throwable throwable, final Object message) { - if (message instanceof BatchedModificationsReply) { - this.semaphore.release(((BatchedModificationsReply)message).getNumBatched()); - } else { - this.semaphore.release(); - } + void release() { + release(1); } - public TransactionIdentifier getIdentifier() { + void release(final int permits) { + this.semaphore.release(permits); + } + + @VisibleForTesting + TransactionIdentifier getIdentifier() { return identifier; } @VisibleForTesting - int availablePermits(){ + int availablePermits() { return semaphore.availablePermits(); } /** - * Release all the permits + * Release all the permits. */ public void releaseAll() { - this.semaphore.release(maxPermits-availablePermits()); + this.semaphore.release(maxPermits - availablePermits()); } }