package org.opendaylight.controller.cluster.datastore;
-import akka.dispatch.OnComplete;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly
- * into akka to release permits as futures complete.
+ * Class for limiting operations.
-public class OperationLimiter extends OnComplete<Object> {
+public class OperationLimiter {
private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
private final TransactionIdentifier identifier;
private final long acquireTimeout;
this.semaphore = new Semaphore(maxPermits);
- void acquire() {
- acquire(1);
+ boolean acquire() {
+ return acquire(1);
- void acquire(final int acquirePermits) {
+ boolean acquire(final int acquirePermits) {
try {
- if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
- LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
+ if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
+ return true;
+ LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e);
LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
+ return false;
- @Override
- public void onComplete(final Throwable throwable, final Object message) {
- if (message instanceof BatchedModificationsReply) {
- this.semaphore.release(((BatchedModificationsReply)message).getNumBatched());
- } else {
- this.semaphore.release();
- }
+ void release() {
+ release(1);
+ }
+ void release(int permits) {
+ this.semaphore.release(permits);
public TransactionIdentifier getIdentifier() {
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
+ private int batchPermits;
+ /**
+ * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend
+ * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them
+ * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the
+ * message to resynchronize with the backend, sharing a 'lost message' failure path.
+ */
+ private volatile Throwable failedModification;
protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
this.actorContext = actorContext;
- private Future<Object> completeOperation(Future<Object> operationFuture) {
- operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
- return operationFuture;
- }
private ActorSelection getActor() {
return actor;
return actorContext;
- protected Future<Object> executeOperationAsync(SerializableMessage msg, Timeout timeout) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable(), timeout));
- }
public void closeTransaction() {
LOG.debug("Tx {} closeTransaction called", getIdentifier());
return new BatchedModifications(getIdentifier(), getTransactionVersion());
- private void batchModification(Modification modification) {
+ private void batchModification(Modification modification, boolean havePermit) {
+ if (havePermit) {
+ ++batchPermits;
+ }
if (batchedModifications == null) {
batchedModifications = newBatchedModifications();
final BatchedModifications toSend = batchedModifications;
+ final int permitsToRelease = batchPermits;
+ batchPermits = 0;
if (ready) {
batchedModifications = null;
} else {
batchedModifications = newBatchedModifications();
+ final Throwable failure = failedModification;
+ if (failure != null) {
+ // We have observed a modification failure, it does not make sense to send this batch. This speeds
+ // up the time when the application could be blocked due to messages timing out and operation
+ // limiter kicking in.
+ LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier());
+ limiter.release(permitsToRelease);
+ return Futures.failed(failure);
+ }
- sent = executeOperationAsync(toSend, actorContext.getTransactionCommitOperationTimeout());
+ sent = actorContext.executeOperationAsync(getActor(), toSend.toSerializable(),
+ actorContext.getTransactionCommitOperationTimeout());
+ sent.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object success) {
+ if (failure != null) {
+ LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
+ failedModification = failure;
+ } else {
+ LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success);
+ }
+ limiter.release(permitsToRelease);
+ }
+ }, actorContext.getClientDispatcher());
return sent;
LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
modification.getClass().getSimpleName(), modification.getPath());
- acquireOperation();
- batchModification(modification);
+ final boolean havePermit = failedModification == null && acquireOperation();
+ batchModification(modification, havePermit);
LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ final Throwable failure = failedModification;
+ if (failure != null) {
+ // If we know there was a previous modification failure, we must not send a read request, as it risks
+ // returning incorrect data. We check this before acquiring an operation simply because we want the app
+ // to complete this transaction as soon as possible.
+ returnFuture.setException(new ReadFailedException("Previous modification failed, cannot "
+ + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
+ return;
+ }
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
- acquireOperation();
+ final boolean havePermit = acquireOperation();
OnComplete<Object> onComplete = new OnComplete<Object>() {
- public void onComplete(Throwable failure, Object response) throws Throwable {
+ public void onComplete(Throwable failure, Object response) {
+ // We have previously acquired an operation, now release it, no matter what happened
+ if (havePermit) {
+ limiter.release();
+ }
if (failure != null) {
LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
- Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()),
- actorContext.getOperationTimeout());
+ final Future<Object> future = actorContext.executeOperationAsync(getActor(),
+ readCmd.asVersion(getTransactionVersion()).toSerializable(), actorContext.getOperationTimeout());
future.onComplete(onComplete, actorContext.getClientDispatcher());
- * Acquire operation from the limiter if the hand-off has completed. If
- * the hand-off is still ongoing, this method does nothing.
+ * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method
+ * does nothing.
+ *
+ * @return True if a permit was successfully acquired, false otherwise
- private void acquireOperation() {
- if (isOperationHandOffComplete()) {
- limiter.acquire();
- }
+ private boolean acquireOperation() {
+ return isOperationHandOffComplete() && limiter.acquire();
+++ /dev/null
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-import static org.junit.Assert.assertEquals;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
- * Unit tests for OperationCompleter.
- *
- * @author Thomas Pantelis
- */
-public class OperationLimiterTest {
- private final TransactionIdentifier transactionId = MockIdentifiers.transactionIdentifier(
- OperationLimiterTest.class, "mock");
- @Test
- public void testOnComplete() throws Exception {
- int permits = 10;
- OperationLimiter limiter = new OperationLimiter(transactionId, permits, 1);
- limiter.acquire(permits);
- int availablePermits = 0;
- limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
- assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
- limiter.onComplete(null, new DataExistsReply(true, DataStoreVersions.CURRENT_VERSION));
- assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
- limiter.onComplete(null, new IllegalArgumentException());
- assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
- limiter.onComplete(null, new BatchedModificationsReply(4));
- availablePermits += 4;
- assertEquals("availablePermits", availablePermits, limiter.availablePermits());
- }
--- /dev/null
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.Status.Failure;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.OnComplete;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+ * Test whether RmoteTransactionContext operates correctly.
+ */
+public class RemoteTransactionContextTest extends AbstractActorTest {
+ private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
+ ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
+ 0), 0);
+ private static final DeleteModification DELETE = new DeleteModification(DataStoreVersions.CURRENT_VERSION);
+ private OperationLimiter limiter;
+ private RemoteTransactionContext txContext;
+ private ActorContext actorContext;
+ private TestKit kit;
+ @Before
+ public void before() {
+ kit = new TestKit(getSystem());
+ actorContext = Mockito.spy(new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+ mock(Configuration.class)));
+ limiter = new OperationLimiter(TX_ID, 4, 0);
+ txContext = new RemoteTransactionContext(TX_ID, actorContext.actorSelection(kit.getRef().path()), actorContext,
+ DataStoreVersions.CURRENT_VERSION, limiter);
+ txContext.operationHandOffComplete();
+ }
+ /**
+ * OperationLimiter should be correctly released when a failure, like AskTimeoutException occurs. Future reads
+ * need to complete immediately with the failure and modifications should not be throttled and thrown away
+ * immediately.
+ */
+ @Test
+ public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
+ txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE);
+ assertEquals(2, limiter.availablePermits());
+ Future<Object> future = txContext.sendBatchedModifications();
+ assertEquals(2, limiter.availablePermits());
+ BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
+ assertEquals(2, msg.getModifications().size());
+ assertEquals(1, msg.getTotalMessagesSent());
+ sendReply(new Failure(new NullPointerException()));
+ assertFuture(future, new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) {
+ assertTrue(failure instanceof NullPointerException);
+ assertEquals(4, limiter.availablePermits());
+ // The transaction has failed, no throttling should occur
+ txContext.executeModification(DELETE);
+ assertEquals(4, limiter.availablePermits());
+ // Executing a read should result in immediate failure
+ final SettableFuture<Boolean> readFuture = SettableFuture.create();
+ txContext.executeRead(new DataExists(), readFuture);
+ assertTrue(readFuture.isDone());
+ try {
+ readFuture.get();
+ fail("Read future did not fail");
+ } catch (ExecutionException | InterruptedException e) {
+ assertTrue(e.getCause() instanceof NullPointerException);
+ }
+ }
+ });
+ future = txContext.directCommit();
+ msg = kit.expectMsgClass(BatchedModifications.class);
+ // Modification should have been thrown away by the dropped transmit induced by executeRead()
+ assertEquals(0, msg.getModifications().size());
+ assertTrue(msg.isDoCommitOnReady());
+ assertTrue(msg.isReady());
+ assertEquals(2, msg.getTotalMessagesSent());
+ sendReply(new Failure(new IllegalStateException()));
+ assertFuture(future, new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) {
+ assertTrue(failure instanceof IllegalStateException);
+ }
+ });
+ kit.expectNoMsg();
+ }
+ /**
+ * OperationLimiter gives up throttling at some point -- {@link RemoteTransactionContext} needs to deal with that
+ * case, too.
+ */
+ @Test
+ public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
+ txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE);
+ assertEquals(0, limiter.availablePermits());
+ txContext.executeModification(DELETE);
+ // Last acquire should have failed ...
+ assertEquals(0, limiter.availablePermits());
+ Future<Object> future = txContext.sendBatchedModifications();
+ assertEquals(0, limiter.availablePermits());
+ BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
+ // ... so we are sending 5 modifications ...
+ assertEquals(5, msg.getModifications().size());
+ assertEquals(1, msg.getTotalMessagesSent());
+ sendReply(new Failure(new NullPointerException()));
+ assertFuture(future, new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) {
+ assertTrue(failure instanceof NullPointerException);
+ // ... but they account for only 4 permits.
+ assertEquals(4, limiter.availablePermits());
+ }
+ });
+ kit.expectNoMsg();
+ }
+ private void sendReply(final Object message) {
+ final ActorRef askActor = kit.getLastSender();
+ kit.watch(askActor);
+ kit.reply(new Failure(new IllegalStateException()));
+ kit.expectTerminated(askActor);
+ }
+ private static void assertFuture(final Future<Object> future, final OnComplete<Object> complete)
+ throws TimeoutException, InterruptedException {
+ Await.ready(future, FiniteDuration.apply(3, TimeUnit.SECONDS));
+ future.onComplete(complete, ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()));
+ }