Merge "BUG 2792 : ThreePhaseCommitCohortProxy should serialize CanCommit"
authorTom Pantelis <tpanteli@brocade.com>
Fri, 27 Mar 2015 12:56:43 +0000 (12:56 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 27 Mar 2015 12:56:43 +0000 (12:56 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java

index c479da73127760977d4c27b3ff9873d10c295c57..aeb4062103d1c0fc77c29f0c733447728a60194f 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -117,17 +118,25 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} finishCanCommit", transactionId);
         }
-        // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
-        // their canCommit processing. If any one fails then we'll fail canCommit.
 
-        Future<Iterable<Object>> combinedFuture =
-                invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
+        // For empty transactions return immediately
+        if(cohorts.size() == 0){
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
+            }
+            returnFuture.set(Boolean.TRUE);
+            return;
+        }
 
-        combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+        final Object message = new CanCommitTransaction(transactionId).toSerializable();
+
+        final Iterator<ActorSelection> iterator = cohorts.iterator();
+
+        final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
-                if(failure != null) {
-                    if(LOG.isDebugEnabled()) {
+            public void onComplete(Throwable failure, Object response) throws Throwable {
+                if (failure != null) {
+                    if (LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
                     }
                     returnFuture.setException(failure);
@@ -135,27 +144,36 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 boolean result = true;
-                for(Object response: responses) {
-                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
-                        CanCommitTransactionReply reply =
-                                CanCommitTransactionReply.fromSerializable(response);
-                        if (!reply.getCanCommit()) {
-                            result = false;
-                            break;
-                        }
-                    } else {
-                        LOG.error("Unexpected response type {}", response.getClass());
-                        returnFuture.setException(new IllegalArgumentException(
-                                String.format("Unexpected response type %s", response.getClass())));
-                        return;
+                if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+                    CanCommitTransactionReply reply =
+                            CanCommitTransactionReply.fromSerializable(response);
+                    if (!reply.getCanCommit()) {
+                        result = false;
                     }
+                } else {
+                    LOG.error("Unexpected response type {}", response.getClass());
+                    returnFuture.setException(new IllegalArgumentException(
+                            String.format("Unexpected response type %s", response.getClass())));
+                    return;
                 }
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
+                if(iterator.hasNext() && result){
+                    Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+                            actorContext.getTransactionCommitOperationTimeout());
+                    future.onComplete(this, actorContext.getClientDispatcher());
+                } else {
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+                    }
+                    returnFuture.set(Boolean.valueOf(result));
                 }
-                returnFuture.set(Boolean.valueOf(result));
+
             }
-        }, actorContext.getClientDispatcher());
+        };
+
+        Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+                actorContext.getTransactionCommitOperationTimeout());
+        future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
index 647b6e7b542508953bb9750555aaf193c0ba2864..d595adc8bb80c0a98701526b2240b757c1f2d6c6 100644 (file)
@@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.SerializableMessag
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
@@ -116,6 +117,9 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
                 isA(requestType), any(Timeout.class));
+
+        doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+                .when(actorContext).getTransactionCommitOperationTimeout();
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
@@ -180,9 +184,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+        Boolean actual = future.get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+        assertEquals("canCommit", false, actual);
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
 
     @Test(expected = TestException.class)