*/
package org.opendaylight.controller.cluster.datastore;
-import akka.dispatch.OnComplete;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly
- * into akka to release permits as futures complete.
+ * Class for limiting operations.
*/
-public class OperationLimiter extends OnComplete<Object> {
+public class OperationLimiter {
private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
private final TransactionIdentifier identifier;
private final long acquireTimeout;
this.semaphore = new Semaphore(maxPermits);
}
- void acquire() {
- acquire(1);
+ boolean acquire() {
+ return acquire(1);
}
- void acquire(final int acquirePermits) {
+ boolean acquire(final int acquirePermits) {
try {
- if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
- LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
+ if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
+ return true;
}
+
+ LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e);
LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
}
}
+
+ return false;
}
- @Override
- public void onComplete(final Throwable throwable, final Object message) {
- if (message instanceof BatchedModificationsReply) {
- this.semaphore.release(((BatchedModificationsReply)message).getNumBatched());
- } else {
- this.semaphore.release();
- }
+ void release() {
+ release(1);
+ }
+
+ void release(int permits) {
+ this.semaphore.release(permits);
}
public TransactionIdentifier getIdentifier() {