Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Define PayloadVersion.MAGNESIUM
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
OperationLimiter.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
index b42230971b2fe276ea5bc8d39f25a3d2f455de83..3f0c98c47ff218bc105be34a8b9d2cab10ab8a88 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
@@
-7,44
+7,45
@@
*/
package org.opendaylight.controller.cluster.datastore;
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.dispatch.OnComplete;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly
- * into akka to release permits as futures complete.
+ * Class for limiting operations.
*/
*/
-public class OperationLimiter
extends OnComplete<Object>
{
+public class OperationLimiter {
private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
private final TransactionIdentifier identifier;
private final long acquireTimeout;
private final Semaphore semaphore;
private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
private final TransactionIdentifier identifier;
private final long acquireTimeout;
private final Semaphore semaphore;
+ private final int maxPermits;
- OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final
int
acquireTimeoutSeconds) {
- this.identifier =
Preconditions.checkNot
Null(identifier);
+ OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final
long
acquireTimeoutSeconds) {
+ this.identifier =
requireNon
Null(identifier);
-
Preconditions.
checkArgument(acquireTimeoutSeconds >= 0);
+ checkArgument(acquireTimeoutSeconds >= 0);
this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
- Preconditions.checkArgument(maxPermits >= 0);
+ checkArgument(maxPermits >= 0);
+ this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
}
this.semaphore = new Semaphore(maxPermits);
}
-
void
acquire() {
- acquire(1);
+
boolean
acquire() {
+
return
acquire(1);
}
}
-
private void
acquire(final int acquirePermits) {
+
boolean
acquire(final int acquirePermits) {
try {
try {
- if (
!
semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
-
LOG.warn("Failed to acquire operation permit for transaction {}", identifier)
;
+ if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
+
return true
;
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
@@
-53,27
+54,32
@@
public class OperationLimiter extends OnComplete<Object> {
LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
}
}
LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
}
}
+
+ return false;
}
void release() {
}
void release() {
-
this.semaphore.release(
);
+
release(1
);
}
}
- @Override
- public void onComplete(final Throwable throwable, final Object message) {
- if (message instanceof BatchedModificationsReply) {
- this.semaphore.release(((BatchedModificationsReply)message).getNumBatched());
- } else {
- this.semaphore.release();
- }
+ void release(final int permits) {
+ this.semaphore.release(permits);
}
}
- public TransactionIdentifier getIdentifier() {
+ @VisibleForTesting
+ TransactionIdentifier getIdentifier() {
return identifier;
}
@VisibleForTesting
return identifier;
}
@VisibleForTesting
- Semaphore getSemaphore() {
- return semaphore;
+ int availablePermits() {
+ return semaphore.availablePermits();
+ }
+
+ /**
+ * Release all the permits.
+ */
+ public void releaseAll() {
+ this.semaphore.release(maxPermits - availablePermits());
}
}
}
}