/* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore; import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly * into akka to release permits as futures complete. */ public class OperationLimiter extends OnComplete { private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class); private final TransactionIdentifier identifier; private final long acquireTimeout; private final Semaphore semaphore; private final int maxPermits; OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final long acquireTimeoutSeconds) { this.identifier = Preconditions.checkNotNull(identifier); Preconditions.checkArgument(acquireTimeoutSeconds >= 0); this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds); Preconditions.checkArgument(maxPermits >= 0); this.maxPermits = maxPermits; this.semaphore = new Semaphore(maxPermits); } void acquire() { acquire(1); } void acquire(final int acquirePermits) { try { if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) { LOG.warn("Failed to acquire operation permit for transaction {}", identifier); } } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e); } else { LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier); } } } @Override public void onComplete(final Throwable throwable, final Object message) { if (message instanceof BatchedModificationsReply) { this.semaphore.release(((BatchedModificationsReply)message).getNumBatched()); } else { this.semaphore.release(); } } public TransactionIdentifier getIdentifier() { return identifier; } @VisibleForTesting int availablePermits(){ return semaphore.availablePermits(); } /** * Release all the permits */ public void releaseAll() { this.semaphore.release(maxPermits-availablePermits()); } }