I noticed when a tx times out on the front-end during CAN_COMMIT, it
tries to abort the tx but it may not get aborted in the Shard and the
front-end gets an AskTimeoutEx on the abort. The reason is that the
Shard only processes the abort request if the tx is the current tx being
committed. If it isn't, the request is ignored and no response is sent,
resulting in the front-end timeout.
I think it makes sense to also process the abort if the tx is sitting in
the queue awaiting CAN_COMMIT. If the front-end says to abort for any
reason, the Shard should honor it. Also, if it isn't aborted, the Shard
may dequeue it sometime later and attempt to commit it which can lead to
unpredictable results if prior commits failed.
As per the comments in the Helium patch, I did some re-factoring to make
it a bit cleaner. I moved the abort code from the Shard to the
ShardCommitCoordinator. This makes it consistent with the other tx
phases where the Shard mostly delegates to the ShardCommitCoordinator. I
also removed the getCohort method from CohortEntry and added appropriate
methods so the internal cohort instance isn't exposed.
There's more refactoring/cleanup that can be done re: Futures and also
moving CohortEntry into its own class (it's large enough) but I don't
want to overload this patch.
Change-Id: I73c79a5e4a2b39b7ee4d97a011de2d29b050dbc4
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit
70e0f223f41ab77de24b6df940d12acd39279e9a)
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
*/
public class Shard extends RaftActor {
- private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+ protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@VisibleForTesting
static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
}
void continueCommit(final CohortEntry cohortEntry) throws Exception {
- final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+ final DataTreeCandidate candidate = cohortEntry.getCandidate();
// If we do not have any followers and we are not using persistence
// or if cohortEntry has no modifications
LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- cohortEntry.getCohort().commit().get();
+ cohortEntry.commit();
sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
if(cohortEntry != null) {
try {
- store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+ store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
} catch (DataValidationFailedException e) {
shardMBean.incrementFailedTransactionsCount();
LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
}
void doAbortTransaction(final String transactionID, final ActorRef sender) {
- final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if(cohortEntry != null) {
- LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
-
- // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
- // aborted during replication in which case we may still commit locally if replication
- // succeeds.
- commitCoordinator.currentTransactionComplete(transactionID, false);
-
- final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
- final ActorRef self = getSelf();
-
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void v) {
- shardMBean.incrementAbortTransactionsCount();
-
- if(sender != null) {
- sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
- }
- }
-
- @Override
- public void onFailure(final Throwable t) {
- LOG.error("{}: An exception happened during abort", persistenceId(), t);
-
- if(sender != null) {
- sender.tell(new akka.actor.Status.Failure(t), self);
- }
- }
- });
- }
+ commitCoordinator.handleAbort(transactionID, sender, this);
}
private void handleCreateTransaction(final Object message) {
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
/**
private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
+
+ log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
+ queuedCohortEntries.size());
+
return true;
} else {
cohortCache.remove(cohortEntry.getTransactionID());
private void doCanCommit(final CohortEntry cohortEntry) {
boolean canCommit = false;
try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- canCommit = cohortEntry.getCohort().canCommit().get();
+ canCommit = cohortEntry.canCommit();
log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
// normally fail since we ensure only one concurrent 3-phase commit.
try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- cohortEntry.getCohort().preCommit().get();
+ cohortEntry.preCommit();
cohortEntry.getShard().continueCommit(cohortEntry);
return doCommit(cohortEntry);
}
+ void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+ CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry != null) {
+ // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
+ // aborted during replication in which case we may still commit locally if replication
+ // succeeds.
+ currentTransactionComplete(transactionID, false);
+ } else {
+ cohortEntry = getAndRemoveCohortEntry(transactionID);
+ }
+
+ if(cohortEntry == null) {
+ return;
+ }
+
+ log.debug("{}: Aborting transaction {}", name, transactionID);
+
+ final ActorRef self = shard.getSelf();
+ try {
+ cohortEntry.abort();
+
+ shard.getShardMBean().incrementAbortTransactionsCount();
+
+ if(sender != null) {
+ sender.tell(new AbortTransactionReply().toSerializable(), self);
+ }
+ } catch (Exception e) {
+ log.error("{}: An exception happened during abort", name, e);
+
+ if(sender != null) {
+ sender.tell(new akka.actor.Status.Failure(e), self);
+ }
+ }
+ }
+
/**
* Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
* matches the current entry.
} else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
-
- iter.remove();
- cohortCache.remove(next.getTransactionID());
- } else {
+ } else if(!next.isAborted()) {
break;
}
+
+ iter.remove();
+ cohortCache.remove(next.getTransactionID());
}
}
private boolean doImmediateCommit;
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private int totalBatchedModificationsReceived;
+ private boolean aborted;
CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
this.transaction = Preconditions.checkNotNull(transaction);
return transactionID;
}
- ShardDataTreeCohort getCohort() {
- return cohort;
+ DataTreeCandidate getCandidate() {
+ return cohort.getCandidate();
}
int getTotalBatchedModificationsReceived() {
}
}
+ boolean canCommit() throws InterruptedException, ExecutionException {
+ // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
+ // about possibly accessing our state on a different thread outside of our dispatcher.
+ // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
+ // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
+ // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
+ return cohort.canCommit().get();
+ }
+
+ void preCommit() throws InterruptedException, ExecutionException {
+ cohort.preCommit().get();
+ }
+
+ void commit() throws InterruptedException, ExecutionException {
+ cohort.commit().get();
+ }
+
+ void abort() throws InterruptedException, ExecutionException {
+ aborted = true;
+ cohort.abort().get();
+ }
+
void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
Preconditions.checkState(cohort == null, "cohort was already set");
this.shard = shard;
}
+
+ boolean isAborted() {
+ return aborted;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
- final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
- protected Props newShardPropsWithRecoveryComplete() {
-
- final Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
- @Override
- protected void onRecoveryComplete() {
- try {
- super.onRecoveryComplete();
- } finally {
- recoveryComplete.countDown();
- }
- }
- };
- }
- };
- return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
- }
-
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
@Test
public void testApplySnapshot() throws Exception {
+
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
+ testkit.waitUntilLeader(shard);
+
final DataTree store = InMemoryDataTreeFactory.getInstance().create();
store.setSchemaContext(SCHEMA_CONTEXT);
@Test
public void testApplyState() throws Exception {
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+ testkit.waitUntilLeader(shard);
+
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
@Test
public void testApplyStateWithCandidatePayload() throws Exception {
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
- recoveryComplete.await(5, TimeUnit.SECONDS);
+ testkit.waitUntilLeader(shard);
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
}
@Test
- public void testAbortTransaction() throws Throwable {
+ public void testAbortCurrentTransaction() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortTransaction");
+ "testAbortCurrentTransaction");
waitUntilLeader(shard);
}};
}
+ @Test
+ public void testAbortQueuedTransaction() throws Throwable {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {{
+ final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
+ @SuppressWarnings("serial")
+ final Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ super.onReceiveCommand(message);
+ if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ if(cleaupCheckLatch.get() != null) {
+ cleaupCheckLatch.get().countDown();
+ }
+ }
+ }
+ };
+ }
+ };
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(
+ Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
+ doReturn(Futures.immediateFuture(null)).when(cohort).abort();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready the tx.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Send the AbortTransaction message.
+
+ shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+ verify(cohort).abort();
+
+ // Verify the tx cohort is removed from queue at the cleanup check interval.
+
+ cleaupCheckLatch.set(new CountDownLatch(1));
+ assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
+ cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Now send CanCommitTransaction - should fail.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+
+ Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+ assertTrue("Failure type", failure instanceof IllegalStateException);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@Test
public void testCreateSnapshot() throws Exception {
testCreateSnapshot(true, "testCreateSnapshot");