From 81f37aec697cd23d8a70cf1ad5dcb388e10a2093 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Wed, 25 Mar 2015 17:43:36 -0700 Subject: [PATCH] BUG 2792 : ThreePhaseCommitCohortProxy should serialize CanCommit Besides the front-end ConcurrentDOMDataBroker the ThreePhaseCommitCohortProxy also sends CanCommitTransaction to different Shards. This can also cause us to get into the deadlock situation described in the bug and so we serialize the can commits. Change-Id: Iad527ce812f0b285cf41ce29abab30ec0f975aa1 Signed-off-by: Moiz Raja --- .../ThreePhaseCommitCohortProxy.java | 68 ++++++++++++------- .../ThreePhaseCommitCohortProxyTest.java | 10 ++- 2 files changed, 51 insertions(+), 27 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index c479da7312..aeb4062103 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -16,6 +16,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Collections; +import java.util.Iterator; import java.util.List; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -117,17 +118,25 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho if(LOG.isDebugEnabled()) { LOG.debug("Tx {} finishCanCommit", transactionId); } - // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform - // their canCommit processing. If any one fails then we'll fail canCommit. - Future> combinedFuture = - invokeCohorts(new CanCommitTransaction(transactionId).toSerializable()); + // For empty transactions return immediately + if(cohorts.size() == 0){ + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true); + } + returnFuture.set(Boolean.TRUE); + return; + } - combinedFuture.onComplete(new OnComplete>() { + final Object message = new CanCommitTransaction(transactionId).toSerializable(); + + final Iterator iterator = cohorts.iterator(); + + final OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Iterable responses) throws Throwable { - if(failure != null) { - if(LOG.isDebugEnabled()) { + public void onComplete(Throwable failure, Object response) throws Throwable { + if (failure != null) { + if (LOG.isDebugEnabled()) { LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure); } returnFuture.setException(failure); @@ -135,27 +144,36 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } boolean result = true; - for(Object response: responses) { - if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { - CanCommitTransactionReply reply = - CanCommitTransactionReply.fromSerializable(response); - if (!reply.getCanCommit()) { - result = false; - break; - } - } else { - LOG.error("Unexpected response type {}", response.getClass()); - returnFuture.setException(new IllegalArgumentException( - String.format("Unexpected response type %s", response.getClass()))); - return; + if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { + CanCommitTransactionReply reply = + CanCommitTransactionReply.fromSerializable(response); + if (!reply.getCanCommit()) { + result = false; } + } else { + LOG.error("Unexpected response type {}", response.getClass()); + returnFuture.setException(new IllegalArgumentException( + String.format("Unexpected response type %s", response.getClass()))); + return; } - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); + + if(iterator.hasNext() && result){ + Future future = actorContext.executeOperationAsync(iterator.next(), message, + actorContext.getTransactionCommitOperationTimeout()); + future.onComplete(this, actorContext.getClientDispatcher()); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result); + } + returnFuture.set(Boolean.valueOf(result)); } - returnFuture.set(Boolean.valueOf(result)); + } - }, actorContext.getClientDispatcher()); + }; + + Future future = actorContext.executeOperationAsync(iterator.next(), message, + actorContext.getTransactionCommitOperationTimeout()); + future.onComplete(onComplete, actorContext.getClientDispatcher()); } private Future> invokeCohorts(Object message) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 647b6e7b54..d595adc8bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -38,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.SerializableMessag import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @@ -116,6 +117,9 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), isA(requestType), any(Timeout.class)); + + doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS))) + .when(actorContext).getTransactionCommitOperationTimeout(); } private void verifyCohortInvocations(int nCohorts, Class requestType) { @@ -180,9 +184,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { ListenableFuture future = proxy.canCommit(); - assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS)); + Boolean actual = future.get(5, TimeUnit.SECONDS); - verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS); + assertEquals("canCommit", false, actual); + + verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); } @Test(expected = TestException.class) -- 2.36.6