*/
package org.opendaylight.controller.cluster.datastore;
-import akka.dispatch.OnComplete;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly
- * into akka to release permits as futures complete.
+ * Class for limiting operations.
*/
-public class OperationLimiter extends OnComplete<Object> {
+public class OperationLimiter {
private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
private final TransactionIdentifier identifier;
private final long acquireTimeout;
private final int maxPermits;
OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final long acquireTimeoutSeconds) {
- this.identifier = Preconditions.checkNotNull(identifier);
+ this.identifier = requireNonNull(identifier);
- Preconditions.checkArgument(acquireTimeoutSeconds >= 0);
+ checkArgument(acquireTimeoutSeconds >= 0);
this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
- Preconditions.checkArgument(maxPermits >= 0);
+ checkArgument(maxPermits >= 0);
this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
}
- void acquire() {
- acquire(1);
+ boolean acquire() {
+ return acquire(1);
}
- void acquire(final int acquirePermits) {
+ boolean acquire(final int acquirePermits) {
try {
- if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
- LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
+ if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
+ return true;
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
}
}
+
+ return false;
}
- @Override
- public void onComplete(final Throwable throwable, final Object message) {
- if (message instanceof BatchedModificationsReply) {
- this.semaphore.release(((BatchedModificationsReply)message).getNumBatched());
- } else {
- this.semaphore.release();
- }
+ void release() {
+ release(1);
}
- public TransactionIdentifier getIdentifier() {
+ void release(final int permits) {
+ this.semaphore.release(permits);
+ }
+
+ @VisibleForTesting
+ TransactionIdentifier getIdentifier() {
return identifier;
}