X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FOperationLimiter.java;h=3f0c98c47ff218bc105be34a8b9d2cab10ab8a88;hb=2a6aa1775604906755883f810ee9ea6d5f286135;hp=b42230971b2fe276ea5bc8d39f25a3d2f455de83;hpb=c7e1ddeaf842ebb696c8dd38c0ca14c925ee31a1;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java index b42230971b..3f0c98c47f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java @@ -7,44 +7,45 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.dispatch.OnComplete; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly - * into akka to release permits as futures complete. + * Class for limiting operations. */ -public class OperationLimiter extends OnComplete { +public class OperationLimiter { private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class); private final TransactionIdentifier identifier; private final long acquireTimeout; private final Semaphore semaphore; + private final int maxPermits; - OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) { - this.identifier = Preconditions.checkNotNull(identifier); + OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final long acquireTimeoutSeconds) { + this.identifier = requireNonNull(identifier); - Preconditions.checkArgument(acquireTimeoutSeconds >= 0); + checkArgument(acquireTimeoutSeconds >= 0); this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds); - Preconditions.checkArgument(maxPermits >= 0); + checkArgument(maxPermits >= 0); + this.maxPermits = maxPermits; this.semaphore = new Semaphore(maxPermits); } - void acquire() { - acquire(1); + boolean acquire() { + return acquire(1); } - private void acquire(final int acquirePermits) { + boolean acquire(final int acquirePermits) { try { - if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) { - LOG.warn("Failed to acquire operation permit for transaction {}", identifier); + if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) { + return true; } } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { @@ -53,27 +54,32 @@ public class OperationLimiter extends OnComplete { LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier); } } + + return false; } void release() { - this.semaphore.release(); + release(1); } - @Override - public void onComplete(final Throwable throwable, final Object message) { - if (message instanceof BatchedModificationsReply) { - this.semaphore.release(((BatchedModificationsReply)message).getNumBatched()); - } else { - this.semaphore.release(); - } + void release(final int permits) { + this.semaphore.release(permits); } - public TransactionIdentifier getIdentifier() { + @VisibleForTesting + TransactionIdentifier getIdentifier() { return identifier; } @VisibleForTesting - Semaphore getSemaphore() { - return semaphore; + int availablePermits() { + return semaphore.availablePermits(); + } + + /** + * Release all the permits. + */ + public void releaseAll() { + this.semaphore.release(maxPermits - availablePermits()); } }