BUG 2792 : ThreePhaseCommitCohortProxy should serialize CanCommit 39/17139/3
authorMoiz Raja <moraja@cisco.com>
Thu, 26 Mar 2015 00:43:36 +0000 (17:43 -0700)
committerMoiz Raja <moraja@cisco.com>
Fri, 27 Mar 2015 10:06:37 +0000 (03:06 -0700)
Besides the front-end ConcurrentDOMDataBroker the ThreePhaseCommitCohortProxy
also sends CanCommitTransaction to different Shards. This can also cause us
to get into the deadlock situation described in the bug and so we serialize the
can commits.

Change-Id: Iad527ce812f0b285cf41ce29abab30ec0f975aa1
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java

index c479da73127760977d4c27b3ff9873d10c295c57..aeb4062103d1c0fc77c29f0c733447728a60194f 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -117,17 +118,25 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} finishCanCommit", transactionId);
         }
-        // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
-        // their canCommit processing. If any one fails then we'll fail canCommit.
 
-        Future<Iterable<Object>> combinedFuture =
-                invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
+        // For empty transactions return immediately
+        if(cohorts.size() == 0){
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
+            }
+            returnFuture.set(Boolean.TRUE);
+            return;
+        }
 
-        combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+        final Object message = new CanCommitTransaction(transactionId).toSerializable();
+
+        final Iterator<ActorSelection> iterator = cohorts.iterator();
+
+        final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
-                if(failure != null) {
-                    if(LOG.isDebugEnabled()) {
+            public void onComplete(Throwable failure, Object response) throws Throwable {
+                if (failure != null) {
+                    if (LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
                     }
                     returnFuture.setException(failure);
@@ -135,27 +144,36 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 boolean result = true;
-                for(Object response: responses) {
-                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
-                        CanCommitTransactionReply reply =
-                                CanCommitTransactionReply.fromSerializable(response);
-                        if (!reply.getCanCommit()) {
-                            result = false;
-                            break;
-                        }
-                    } else {
-                        LOG.error("Unexpected response type {}", response.getClass());
-                        returnFuture.setException(new IllegalArgumentException(
-                                String.format("Unexpected response type %s", response.getClass())));
-                        return;
+                if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+                    CanCommitTransactionReply reply =
+                            CanCommitTransactionReply.fromSerializable(response);
+                    if (!reply.getCanCommit()) {
+                        result = false;
                     }
+                } else {
+                    LOG.error("Unexpected response type {}", response.getClass());
+                    returnFuture.setException(new IllegalArgumentException(
+                            String.format("Unexpected response type %s", response.getClass())));
+                    return;
                 }
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
+                if(iterator.hasNext() && result){
+                    Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+                            actorContext.getTransactionCommitOperationTimeout());
+                    future.onComplete(this, actorContext.getClientDispatcher());
+                } else {
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+                    }
+                    returnFuture.set(Boolean.valueOf(result));
                 }
-                returnFuture.set(Boolean.valueOf(result));
+
             }
-        }, actorContext.getClientDispatcher());
+        };
+
+        Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+                actorContext.getTransactionCommitOperationTimeout());
+        future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
index 647b6e7b542508953bb9750555aaf193c0ba2864..d595adc8bb80c0a98701526b2240b757c1f2d6c6 100644 (file)
@@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.SerializableMessag
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
@@ -116,6 +117,9 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
                 isA(requestType), any(Timeout.class));
+
+        doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
+                .when(actorContext).getTransactionCommitOperationTimeout();
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
@@ -180,9 +184,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
+        Boolean actual = future.get(5, TimeUnit.SECONDS);
 
-        verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+        assertEquals("canCommit", false, actual);
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
 
     @Test(expected = TestException.class)