From 2088e1ffef92b11e9d877b2a51280b142a84c58b Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 1 Jul 2015 18:59:51 -0400 Subject: [PATCH] CDS: Changes to Tx abort in Shard I noticed when a tx times out on the front-end during CAN_COMMIT, it tries to abort the tx but it may not get aborted in the Shard and the front-end gets an AskTimeoutEx on the abort. The reason is that the Shard only processes the abort request if the tx is the current tx being committed. If it isn't, the request is ignored and no response is sent, resulting in the front-end timeout. I think it makes sense to also process the abort if the tx is sitting in the queue awaiting CAN_COMMIT. If the front-end says to abort for any reason, the Shard should honor it. Also, if it isn't aborted, the Shard may dequeue it sometime later and attempt to commit it which can lead to unpredictable results if prior commits failed. As per the comments in the Helium patch, I did some re-factoring to make it a bit cleaner. I moved the abort code from the Shard to the ShardCommitCoordinator. This makes it consistent with the other tx phases where the Shard mostly delegates to the ShardCommitCoordinator. I also removed the getCohort method from CohortEntry and added appropriate methods so the internal cohort instance isn't exposed. There's more refactoring/cleanup that can be done re: Futures and also moving CohortEntry into its own class (it's large enough) but I don't want to overload this patch. Change-Id: I73c79a5e4a2b39b7ee4d97a011de2d29b050dbc4 Signed-off-by: Tom Pantelis (cherry picked from commit 70e0f223f41ab77de24b6df940d12acd39279e9a) --- .../controller/cluster/datastore/Shard.java | 48 +------- .../datastore/ShardCommitCoordinator.java | 91 +++++++++++--- .../cluster/datastore/ShardTest.java | 115 ++++++++++++++---- 3 files changed, 170 insertions(+), 84 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index ce3cae481e..5a10c3e961 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -18,9 +18,6 @@ import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -35,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactio import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; -import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; @@ -83,7 +79,7 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { - private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; + protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; @VisibleForTesting static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage"; @@ -326,7 +322,7 @@ public class Shard extends RaftActor { } void continueCommit(final CohortEntry cohortEntry) throws Exception { - final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate(); + final DataTreeCandidate candidate = cohortEntry.getCandidate(); // If we do not have any followers and we are not using persistence // or if cohortEntry has no modifications @@ -349,10 +345,7 @@ public class Shard extends RaftActor { LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); try { - // We block on the future here so we don't have to worry about possibly accessing our - // state on a different thread outside of our dispatcher. Also, the data store - // currently uses a same thread executor anyway. - cohortEntry.getCohort().commit().get(); + cohortEntry.commit(); sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); @@ -385,7 +378,7 @@ public class Shard extends RaftActor { cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { try { - store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate()); + store.applyForeignCandidate(transactionID, cohortEntry.getCandidate()); } catch (DataValidationFailedException e) { shardMBean.incrementFailedTransactionsCount(); LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e); @@ -494,38 +487,7 @@ public class Shard extends RaftActor { } void doAbortTransaction(final String transactionID, final ActorRef sender) { - final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); - if(cohortEntry != null) { - LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID); - - // We don't remove the cached cohort entry here (ie pass false) in case the Tx was - // aborted during replication in which case we may still commit locally if replication - // succeeds. - commitCoordinator.currentTransactionComplete(transactionID, false); - - final ListenableFuture future = cohortEntry.getCohort().abort(); - final ActorRef self = getSelf(); - - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Void v) { - shardMBean.incrementAbortTransactionsCount(); - - if(sender != null) { - sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self); - } - } - - @Override - public void onFailure(final Throwable t) { - LOG.error("{}: An exception happened during abort", persistenceId(), t); - - if(sender != null) { - sender.tell(new akka.actor.Status.Failure(t), self); - } - } - }); - } + commitCoordinator.handleAbort(transactionID, sender, this); } private void handleCreateTransaction(final Object message) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 3431755b51..5e88cc73d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -21,6 +21,7 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; @@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.slf4j.Logger; /** @@ -96,6 +98,10 @@ class ShardCommitCoordinator { private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) { if(queuedCohortEntries.size() < queueCapacity) { queuedCohortEntries.offer(cohortEntry); + + log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(), + queuedCohortEntries.size()); + return true; } else { cohortCache.remove(cohortEntry.getTransactionID()); @@ -314,10 +320,7 @@ class ShardCommitCoordinator { private void doCanCommit(final CohortEntry cohortEntry) { boolean canCommit = false; try { - // We block on the future here so we don't have to worry about possibly accessing our - // state on a different thread outside of our dispatcher. Also, the data store - // currently uses a same thread executor anyway. - canCommit = cohortEntry.getCohort().canCommit().get(); + canCommit = cohortEntry.canCommit(); log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit); @@ -361,10 +364,7 @@ class ShardCommitCoordinator { // normally fail since we ensure only one concurrent 3-phase commit. try { - // We block on the future here so we don't have to worry about possibly accessing our - // state on a different thread outside of our dispatcher. Also, the data store - // currently uses a same thread executor anyway. - cohortEntry.getCohort().preCommit().get(); + cohortEntry.preCommit(); cohortEntry.getShard().continueCommit(cohortEntry); @@ -409,6 +409,41 @@ class ShardCommitCoordinator { return doCommit(cohortEntry); } + void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) { + CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); + if(cohortEntry != null) { + // We don't remove the cached cohort entry here (ie pass false) in case the Tx was + // aborted during replication in which case we may still commit locally if replication + // succeeds. + currentTransactionComplete(transactionID, false); + } else { + cohortEntry = getAndRemoveCohortEntry(transactionID); + } + + if(cohortEntry == null) { + return; + } + + log.debug("{}: Aborting transaction {}", name, transactionID); + + final ActorRef self = shard.getSelf(); + try { + cohortEntry.abort(); + + shard.getShardMBean().incrementAbortTransactionsCount(); + + if(sender != null) { + sender.tell(new AbortTransactionReply().toSerializable(), self); + } + } catch (Exception e) { + log.error("{}: An exception happened during abort", name, e); + + if(sender != null) { + sender.tell(new akka.actor.Status.Failure(e), self); + } + } + } + /** * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID * matches the current entry. @@ -483,12 +518,12 @@ class ShardCommitCoordinator { } else if(next.isExpired(cacheExpiryTimeoutInMillis)) { log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache", name, next.getTransactionID(), cacheExpiryTimeoutInMillis); - - iter.remove(); - cohortCache.remove(next.getTransactionID()); - } else { + } else if(!next.isAborted()) { break; } + + iter.remove(); + cohortCache.remove(next.getTransactionID()); } } @@ -511,6 +546,7 @@ class ShardCommitCoordinator { private boolean doImmediateCommit; private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); private int totalBatchedModificationsReceived; + private boolean aborted; CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { this.transaction = Preconditions.checkNotNull(transaction); @@ -539,8 +575,8 @@ class ShardCommitCoordinator { return transactionID; } - ShardDataTreeCohort getCohort() { - return cohort; + DataTreeCandidate getCandidate() { + return cohort.getCandidate(); } int getTotalBatchedModificationsReceived() { @@ -565,6 +601,28 @@ class ShardCommitCoordinator { } } + boolean canCommit() throws InterruptedException, ExecutionException { + // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry + // about possibly accessing our state on a different thread outside of our dispatcher. + // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why + // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously + // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker. + return cohort.canCommit().get(); + } + + void preCommit() throws InterruptedException, ExecutionException { + cohort.preCommit().get(); + } + + void commit() throws InterruptedException, ExecutionException { + cohort.commit().get(); + } + + void abort() throws InterruptedException, ExecutionException { + aborted = true; + cohort.abort().get(); + } + void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) { Preconditions.checkState(cohort == null, "cohort was already set"); @@ -610,6 +668,11 @@ class ShardCommitCoordinator { this.shard = shard; } + + boolean isAborted() { + return aborted; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 83b15b99df..24a5225b87 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -124,29 +125,6 @@ public class ShardTest extends AbstractShardTest { private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"; - final CountDownLatch recoveryComplete = new CountDownLatch(1); - - protected Props newShardPropsWithRecoveryComplete() { - - final Creator creator = new Creator() { - @Override - public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { - @Override - protected void onRecoveryComplete() { - try { - super.onRecoveryComplete(); - } finally { - recoveryComplete.countDown(); - } - } - }; - } - }; - return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); - } - @Test public void testRegisterChangeListener() throws Exception { new ShardTestKit(getSystem()) {{ @@ -478,9 +456,14 @@ public class ShardTest extends AbstractShardTest { @Test public void testApplySnapshot() throws Exception { + + ShardTestKit testkit = new ShardTestKit(getSystem()); + final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplySnapshot"); + testkit.waitUntilLeader(shard); + final DataTree store = InMemoryDataTreeFactory.getInstance().create(); store.setSchemaContext(SCHEMA_CONTEXT); @@ -509,8 +492,12 @@ public class ShardTest extends AbstractShardTest { @Test public void testApplyState() throws Exception { + ShardTestKit testkit = new ShardTestKit(getSystem()); + final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); + testkit.waitUntilLeader(shard); + final NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, @@ -527,9 +514,11 @@ public class ShardTest extends AbstractShardTest { @Test public void testApplyStateWithCandidatePayload() throws Exception { - final TestActorRef shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState"); + ShardTestKit testkit = new ShardTestKit(getSystem()); + + final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); - recoveryComplete.await(5, TimeUnit.SECONDS); + testkit.waitUntilLeader(shard); final NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); @@ -2129,11 +2118,11 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testAbortTransaction() throws Throwable { + public void testAbortCurrentTransaction() throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testAbortTransaction"); + "testAbortCurrentTransaction"); waitUntilLeader(shard); @@ -2195,6 +2184,78 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testAbortQueuedTransaction() throws Throwable { + dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1); + new ShardTestKit(getSystem()) {{ + final AtomicReference cleaupCheckLatch = new AtomicReference<>(); + @SuppressWarnings("serial") + final Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + dataStoreContextBuilder.build(), SCHEMA_CONTEXT) { + @Override + public void onReceiveCommand(final Object message) throws Exception { + super.onReceiveCommand(message); + if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) { + if(cleaupCheckLatch.get() != null) { + cleaupCheckLatch.get().countDown(); + } + } + } + }; + } + }; + + final TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)).withDispatcher( + Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction"); + + waitUntilLeader(shard); + + final String transactionID = "tx1"; + + final MutableCompositeModification modification = new MutableCompositeModification(); + final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort"); + doReturn(Futures.immediateFuture(null)).when(cohort).abort(); + + final FiniteDuration duration = duration("5 seconds"); + + // Ready the tx. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize()); + + // Send the AbortTransaction message. + + shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef()); + expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS); + + verify(cohort).abort(); + + // Verify the tx cohort is removed from queue at the cleanup check interval. + + cleaupCheckLatch.set(new CountDownLatch(1)); + assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true, + cleaupCheckLatch.get().await(5, TimeUnit.SECONDS)); + + assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize()); + + // Now send CanCommitTransaction - should fail. + + shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + + Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause(); + assertTrue("Failure type", failure instanceof IllegalStateException); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCreateSnapshot() throws Exception { testCreateSnapshot(true, "testCreateSnapshot"); -- 2.36.6