import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finishCanCommit", transactionId);
}
- // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
- // their canCommit processing. If any one fails then we'll fail canCommit.
- Future<Iterable<Object>> combinedFuture =
- invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
+ // For empty transactions return immediately
+ if(cohorts.size() == 0){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
+ }
+ returnFuture.set(Boolean.TRUE);
+ return;
+ }
- combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ final Object message = new CanCommitTransaction(transactionId).toSerializable();
+
+ final Iterator<ActorSelection> iterator = cohorts.iterator();
+
+ final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
- if(failure != null) {
- if(LOG.isDebugEnabled()) {
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if (failure != null) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
}
boolean result = true;
- for(Object response: responses) {
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
- if (!reply.getCanCommit()) {
- result = false;
- break;
- }
- } else {
- LOG.error("Unexpected response type {}", response.getClass());
- returnFuture.setException(new IllegalArgumentException(
- String.format("Unexpected response type %s", response.getClass())));
- return;
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ result = false;
}
+ } else {
+ LOG.error("Unexpected response type {}", response.getClass());
+ returnFuture.setException(new IllegalArgumentException(
+ String.format("Unexpected response type %s", response.getClass())));
+ return;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
+ if(iterator.hasNext() && result){
+ Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+ actorContext.getTransactionCommitOperationTimeout());
+ future.onComplete(this, actorContext.getClientDispatcher());
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+ }
+ returnFuture.set(Boolean.valueOf(result));
}
- returnFuture.set(Boolean.valueOf(result));
+
}
- }, actorContext.getClientDispatcher());
+ };
+
+ Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+ actorContext.getTransactionCommitOperationTimeout());
+ future.onComplete(onComplete, actorContext.getClientDispatcher());
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
isA(requestType), any(Timeout.class));
+
+ doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+ .when(actorContext).getTransactionCommitOperationTimeout();
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+ Boolean actual = future.get(5, TimeUnit.SECONDS);
- verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+ assertEquals("canCommit", false, actual);
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
@Test(expected = TestException.class)