switch(parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getCompleter()) {
+ return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
};
case READ_WRITE:
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getCompleter()) {
+ return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
};
case WRITE_ONLY:
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getCompleter()) {
+ return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
abstract class LocalTransactionContext extends AbstractTransactionContext {
private final DOMStoreTransaction txDelegate;
- private final OperationCompleter completer;
+ private final OperationLimiter limiter;
- LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationCompleter completer) {
+ LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationLimiter limiter) {
super(identifier);
this.txDelegate = Preconditions.checkNotNull(txDelegate);
- this.completer = Preconditions.checkNotNull(completer);
+ this.limiter = Preconditions.checkNotNull(limiter);
}
protected abstract DOMStoreWriteTransaction getWriteDelegate();
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().write(path, data);
- completer.onComplete(null, null);
+ limiter.release();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().merge(path, data);
- completer.onComplete(null, null);
+ limiter.release();
}
@Override
public void deleteData(YangInstanceIdentifier path) {
incrementModificationCount();
getWriteDelegate().delete(path);
- completer.onComplete(null, null);
+ limiter.release();
}
@Override
@Override
public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
proxyFuture.set(result);
- completer.onComplete(null, null);
+ limiter.release();
}
@Override
public void onFailure(Throwable t) {
proxyFuture.setException(t);
- completer.onComplete(null, null);
+ limiter.release();
}
});
}
@Override
public void onSuccess(Boolean result) {
proxyFuture.set(result);
- completer.onComplete(null, null);
+ limiter.release();
}
@Override
public void onFailure(Throwable t) {
proxyFuture.setException(t);
- completer.onComplete(null, null);
+ limiter.release();
}
});
}
private LocalThreePhaseCommitCohort ready() {
logModificationCount();
LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
- completer.onComplete(null, null);
+ limiter.release();
return ready;
}
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.Semaphore;
-
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final Throwable failure;
- private final Semaphore operationLimiter;
+ private final OperationLimiter operationLimiter;
- public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter) {
+ public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, OperationLimiter operationLimiter) {
super(identifier);
this.failure = failure;
this.operationLimiter = operationLimiter;
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.dispatch.OnComplete;
-import com.google.common.base.Preconditions;
-import java.util.concurrent.Semaphore;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-
-public final class OperationCompleter extends OnComplete<Object> {
- private final Semaphore operationLimiter;
-
- OperationCompleter(Semaphore operationLimiter){
- this.operationLimiter = Preconditions.checkNotNull(operationLimiter);
- }
-
- @Override
- public void onComplete(Throwable throwable, Object message) {
- if(message instanceof BatchedModificationsReply) {
- this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched());
- } else {
- this.operationLimiter.release();
- }
- }
-}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly
+ * into akka to release permits as futures complete.
+ */
+public class OperationLimiter extends OnComplete<Object> {
+ private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
+ private final TransactionIdentifier identifier;
+ private final long acquireTimeout;
+ private final Semaphore semaphore;
+
+ OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
+ this.identifier = Preconditions.checkNotNull(identifier);
+
+ Preconditions.checkArgument(acquireTimeoutSeconds >= 0);
+ this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
+
+ Preconditions.checkArgument(maxPermits >= 0);
+ this.semaphore = new Semaphore(maxPermits);
+ }
+
+ void acquire() {
+ acquire(1);
+ }
+
+ private void acquire(final int acquirePermits) {
+ try {
+ if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
+ LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
+ }
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e);
+ } else {
+ LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
+ }
+ }
+ }
+
+ void release() {
+ this.semaphore.release();
+ }
+
+ @Override
+ public void onComplete(final Throwable throwable, final Object message) {
+ if (message instanceof BatchedModificationsReply) {
+ this.semaphore.release(((BatchedModificationsReply)message).getNumBatched());
+ } else {
+ this.semaphore.release();
+ }
+ }
+
+ public TransactionIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ @VisibleForTesting
+ Semaphore getSemaphore() {
+ return semaphore;
+ }
+}
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
- private final OperationCompleter operationCompleter;
+ private final OperationLimiter operationCompleter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ short remoteTransactionVersion, OperationLimiter limiter) {
super(identifier);
this.actor = actor;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
- this.operationCompleter = operationCompleter;
+ this.operationCompleter = limiter;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
return parent.getActorContext();
}
- private Semaphore getOperationLimiter() {
+ private OperationLimiter getOperationLimiter() {
return parent.getLimiter();
}
if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
- getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+ getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
} else {
ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
- isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+ isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
}
if(parent.getType() == TransactionType.READ_ONLY) {
*/
private void enqueueTransactionOperation(final TransactionOperation operation) {
final boolean invokeOperation;
- synchronized(queuedTxOperations) {
+ synchronized (queuedTxOperations) {
if (transactionContext == null) {
LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
private final AbstractTransactionContextFactory<?> txContextFactory;
+ private final OperationLimiter limiter;
private final TransactionType type;
private TransactionState state = TransactionState.OPEN;
- private volatile OperationCompleter operationCompleter;
- private volatile Semaphore operationLimiter;
@VisibleForTesting
public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
this.txContextFactory = txContextFactory;
this.type = Preconditions.checkNotNull(type);
+ // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+ this.limiter = new OperationLimiter(getIdentifier(),
+ getActorContext().getTransactionOutstandingOperationLimit(),
+ getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
+
LOG.debug("New {} Tx - {}", type, getIdentifier());
}
LOG.debug("Tx {} exists {}", getIdentifier(), path);
- throttleOperation();
+ limiter.acquire();
final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
if (YangInstanceIdentifier.EMPTY.equals(path)) {
return readAllData();
} else {
- throttleOperation();
+ limiter.acquire();
return singleShardRead(shardNameFromIdentifier(path), path);
}
LOG.debug("Tx {} delete {}", getIdentifier(), path);
- throttleOperation();
+ limiter.acquire();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
LOG.debug("Tx {} merge {}", getIdentifier(), path);
- throttleOperation();
+ limiter.acquire();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
LOG.debug("Tx {} write {}", getIdentifier(), path);
- throttleOperation();
+ limiter.acquire();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
final TransactionContextWrapper contextAdapter) {
- throttleOperation();
+ limiter.acquire();
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
- throttleOperation();
+ limiter.acquire();
final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
return txContextFactory.getActorContext();
}
- OperationCompleter getCompleter() {
- OperationCompleter ret = operationCompleter;
- if (ret == null) {
- final Semaphore s = getLimiter();
- ret = new OperationCompleter(s);
- operationCompleter = ret;
- }
-
- return ret;
- }
-
- Semaphore getLimiter() {
- Semaphore ret = operationLimiter;
- if (ret == null) {
- // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
- ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit());
- operationLimiter = ret;
- }
- return ret;
- }
-
- void throttleOperation() {
- throttleOperation(1);
- }
-
- private void throttleOperation(int acquirePermits) {
- try {
- if (!getLimiter().tryAcquire(acquirePermits,
- getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
- LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
- }
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e);
- } else {
- LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
- }
- }
+ OperationLimiter getLimiter() {
+ return limiter;
}
}
import akka.actor.ActorSelection;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.OperationCompleter;
+import org.opendaylight.controller.cluster.datastore.OperationLimiter;
import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, boolean isTxActorLocal,
- short remoteTransactionVersion, OperationCompleter operationCompleter) {
- super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+ short remoteTransactionVersion, OperationLimiter limiter) {
+ super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
this.transactionPath = transactionPath;
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.Semaphore;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
public class LocalTransactionContextTest {
@Mock
- Semaphore limiter;
+ OperationLimiter limiter;
@Mock
TransactionIdentifier identifier;
@Before
public void setUp(){
MockitoAnnotations.initMocks(this);
- localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter)) {
+ localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, limiter) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWriteTransaction;
@Test
public void testWrite(){
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
- NormalizedNode normalizedNode = mock(NormalizedNode.class);
+ NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
verify(limiter).release();
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
@Test
public void testMerge(){
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
- NormalizedNode normalizedNode = mock(NormalizedNode.class);
+ NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
verify(limiter).release();
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
@Test
public void testRead(){
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
- NormalizedNode normalizedNode = mock(NormalizedNode.class);
+ NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
verify(limiter).release();
}
-}
\ No newline at end of file
+}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import java.util.concurrent.Semaphore;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-
-/**
- * Unit tests for OperationCompleter.
- *
- * @author Thomas Pantelis
- */
-public class OperationCompleterTest {
-
- @Test
- public void testOnComplete() throws Exception {
- int permits = 10;
- Semaphore operationLimiter = new Semaphore(permits);
- operationLimiter.acquire(permits);
- int availablePermits = 0;
-
- OperationCompleter completer = new OperationCompleter(operationLimiter );
-
- completer.onComplete(null, DataExistsReply.create(true));
- assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
-
- completer.onComplete(null, DataExistsReply.create(true));
- assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
-
- completer.onComplete(null, new IllegalArgumentException());
- assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
-
- completer.onComplete(null, new BatchedModificationsReply(4));
- availablePermits += 4;
- assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits());
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.Semaphore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+
+/**
+ * Unit tests for OperationCompleter.
+ *
+ * @author Thomas Pantelis
+ */
+public class OperationLimiterTest {
+
+ @Test
+ public void testOnComplete() throws Exception {
+ int permits = 10;
+ OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1);
+ Semaphore semaphore = limiter.getSemaphore();
+ semaphore.acquire(permits);
+ int availablePermits = 0;
+
+ limiter.onComplete(null, DataExistsReply.create(true));
+ assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+
+ limiter.onComplete(null, DataExistsReply.create(true));
+ assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+
+ limiter.onComplete(null, new IllegalArgumentException());
+ assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+
+ limiter.onComplete(null, new BatchedModificationsReply(4));
+ availablePermits += 4;
+ assertEquals("availablePermits", availablePermits, semaphore.availablePermits());
+ }
+}