import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
-import java.util.Collections;
-import java.util.List;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
+ private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+ @Override
+ public void run() {
+ }
+
+ @Override
+ public void success() {
+ }
+
+ @Override
+ public void failure() {
+ }
+ };
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
private Future<Void> buildCohortList() {
Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
- actorContext.getActorSystem().dispatcher());
+ actorContext.getClientDispatcher());
return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
@Override
}
return null;
}
- }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
@Override
finishCanCommit(returnFuture);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
return returnFuture;
}
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finishCanCommit", transactionId);
}
- // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
- // their canCommit processing. If any one fails then we'll fail canCommit.
- Future<Iterable<Object>> combinedFuture =
- invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
+ // For empty transactions return immediately
+ if(cohorts.size() == 0){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
+ }
+ returnFuture.set(Boolean.TRUE);
+ return;
+ }
- combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ final Object message = new CanCommitTransaction(transactionId).toSerializable();
+
+ final Iterator<ActorSelection> iterator = cohorts.iterator();
+
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
- if(failure != null) {
- if(LOG.isDebugEnabled()) {
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if (failure != null) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
}
boolean result = true;
- for(Object response: responses) {
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
- if (!reply.getCanCommit()) {
- result = false;
- break;
- }
- } else {
- LOG.error("Unexpected response type {}", response.getClass());
- returnFuture.setException(new IllegalArgumentException(
- String.format("Unexpected response type {}", response.getClass())));
- return;
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ result = false;
}
+ } else {
+ LOG.error("Unexpected response type {}", response.getClass());
+ returnFuture.setException(new IllegalArgumentException(
+ String.format("Unexpected response type %s", response.getClass())));
+ return;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
+ if(iterator.hasNext() && result){
+ Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+ actorContext.getTransactionCommitOperationTimeout());
+ future.onComplete(this, actorContext.getClientDispatcher());
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+ }
+ returnFuture.set(Boolean.valueOf(result));
}
- returnFuture.set(Boolean.valueOf(result));
+
}
- }, actorContext.getActorSystem().dispatcher());
+ };
+
+ Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+ actorContext.getTransactionCommitOperationTimeout());
+ future.onComplete(onComplete, actorContext.getClientDispatcher());
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
}
-
- futureList.add(actorContext.executeOperationAsync(cohort, message));
+ futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
}
- return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
+ return Futures.sequence(futureList, actorContext.getClientDispatcher());
}
@Override
@Override
public ListenableFuture<Void> commit() {
- return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
- CommitTransactionReply.SERIALIZABLE_CLASS, true);
+ OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK :
+ new TransactionRateLimitingCallback(actorContext);
+
+ return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
+ CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
}
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException) {
+ final Class<?> expectedResponseClass, final boolean propagateException) {
+ return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
+ }
+
+ private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} {}", transactionId, operationName);
if(cohorts != null) {
finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
- returnFuture);
+ returnFuture, callback);
} else {
buildCohortList().onComplete(new OnComplete<Void>() {
@Override
}
} else {
finishVoidOperation(operationName, message, expectedResponseClass,
- propagateException, returnFuture);
+ propagateException, returnFuture, callback);
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
return returnFuture;
}
private void finishVoidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException,
- final SettableFuture<Void> returnFuture) {
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finish {}", transactionId, operationName);
}
+
+ callback.run();
+
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
for(Object response: responses) {
if(!response.getClass().equals(expectedResponseClass)) {
exceptionToPropagate = new IllegalArgumentException(
- String.format("Unexpected response type {}",
+ String.format("Unexpected response type %s",
response.getClass()));
break;
}
}
if(exceptionToPropagate != null) {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
operationName, exceptionToPropagate);
}
returnFuture.set(null);
}
+
+ callback.failure();
} else {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
}
returnFuture.set(null);
+
+ callback.success();
}
}
- }, actorContext.getActorSystem().dispatcher());
+ }, actorContext.getClientDispatcher());
}
@VisibleForTesting