import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
*/
public class Shard extends RaftActor {
- private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+ protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@VisibleForTesting
static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
}
void continueCommit(final CohortEntry cohortEntry) throws Exception {
- final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+ final DataTreeCandidate candidate = cohortEntry.getCandidate();
// If we do not have any followers and we are not using persistence
// or if cohortEntry has no modifications
LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- cohortEntry.getCohort().commit().get();
+ cohortEntry.commit();
sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
if(cohortEntry != null) {
try {
- store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+ store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
} catch (DataValidationFailedException e) {
shardMBean.incrementFailedTransactionsCount();
LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
}
void doAbortTransaction(final String transactionID, final ActorRef sender) {
- final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if(cohortEntry != null) {
- LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
-
- // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
- // aborted during replication in which case we may still commit locally if replication
- // succeeds.
- commitCoordinator.currentTransactionComplete(transactionID, false);
-
- final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
- final ActorRef self = getSelf();
-
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void v) {
- shardMBean.incrementAbortTransactionsCount();
-
- if(sender != null) {
- sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
- }
- }
-
- @Override
- public void onFailure(final Throwable t) {
- LOG.error("{}: An exception happened during abort", persistenceId(), t);
-
- if(sender != null) {
- sender.tell(new akka.actor.Status.Failure(t), self);
- }
- }
- });
- }
+ commitCoordinator.handleAbort(transactionID, sender, this);
}
private void handleCreateTransaction(final Object message) {
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
/**
private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
+
+ log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
+ queuedCohortEntries.size());
+
return true;
} else {
cohortCache.remove(cohortEntry.getTransactionID());
private void doCanCommit(final CohortEntry cohortEntry) {
boolean canCommit = false;
try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- canCommit = cohortEntry.getCohort().canCommit().get();
+ canCommit = cohortEntry.canCommit();
log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
// normally fail since we ensure only one concurrent 3-phase commit.
try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- cohortEntry.getCohort().preCommit().get();
+ cohortEntry.preCommit();
cohortEntry.getShard().continueCommit(cohortEntry);
return doCommit(cohortEntry);
}
+ void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+ CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry != null) {
+ // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
+ // aborted during replication in which case we may still commit locally if replication
+ // succeeds.
+ currentTransactionComplete(transactionID, false);
+ } else {
+ cohortEntry = getAndRemoveCohortEntry(transactionID);
+ }
+
+ if(cohortEntry == null) {
+ return;
+ }
+
+ log.debug("{}: Aborting transaction {}", name, transactionID);
+
+ final ActorRef self = shard.getSelf();
+ try {
+ cohortEntry.abort();
+
+ shard.getShardMBean().incrementAbortTransactionsCount();
+
+ if(sender != null) {
+ sender.tell(new AbortTransactionReply().toSerializable(), self);
+ }
+ } catch (Exception e) {
+ log.error("{}: An exception happened during abort", name, e);
+
+ if(sender != null) {
+ sender.tell(new akka.actor.Status.Failure(e), self);
+ }
+ }
+ }
+
/**
* Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
* matches the current entry.
} else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
-
- iter.remove();
- cohortCache.remove(next.getTransactionID());
- } else {
+ } else if(!next.isAborted()) {
break;
}
+
+ iter.remove();
+ cohortCache.remove(next.getTransactionID());
}
}
private boolean doImmediateCommit;
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private int totalBatchedModificationsReceived;
+ private boolean aborted;
CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
this.transaction = Preconditions.checkNotNull(transaction);
return transactionID;
}
- ShardDataTreeCohort getCohort() {
- return cohort;
+ DataTreeCandidate getCandidate() {
+ return cohort.getCandidate();
}
int getTotalBatchedModificationsReceived() {
}
}
+ boolean canCommit() throws InterruptedException, ExecutionException {
+ // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
+ // about possibly accessing our state on a different thread outside of our dispatcher.
+ // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
+ // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
+ // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
+ return cohort.canCommit().get();
+ }
+
+ void preCommit() throws InterruptedException, ExecutionException {
+ cohort.preCommit().get();
+ }
+
+ void commit() throws InterruptedException, ExecutionException {
+ cohort.commit().get();
+ }
+
+ void abort() throws InterruptedException, ExecutionException {
+ aborted = true;
+ cohort.abort().get();
+ }
+
void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
Preconditions.checkState(cohort == null, "cohort was already set");
this.shard = shard;
}
+
+ boolean isAborted() {
+ return aborted;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
- final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
- protected Props newShardPropsWithRecoveryComplete() {
-
- final Creator<Shard> creator = new Creator<Shard>() {
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
- @Override
- protected void onRecoveryComplete() {
- try {
- super.onRecoveryComplete();
- } finally {
- recoveryComplete.countDown();
- }
- }
- };
- }
- };
- return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
- }
-
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
@Test
public void testApplySnapshot() throws Exception {
+
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
+ testkit.waitUntilLeader(shard);
+
final DataTree store = InMemoryDataTreeFactory.getInstance().create();
store.setSchemaContext(SCHEMA_CONTEXT);
@Test
public void testApplyState() throws Exception {
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+ testkit.waitUntilLeader(shard);
+
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
@Test
public void testApplyStateWithCandidatePayload() throws Exception {
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+ ShardTestKit testkit = new ShardTestKit(getSystem());
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
- recoveryComplete.await(5, TimeUnit.SECONDS);
+ testkit.waitUntilLeader(shard);
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
}
@Test
- public void testAbortTransaction() throws Throwable {
+ public void testAbortCurrentTransaction() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortTransaction");
+ "testAbortCurrentTransaction");
waitUntilLeader(shard);
}};
}
+ @Test
+ public void testAbortQueuedTransaction() throws Throwable {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {{
+ final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
+ @SuppressWarnings("serial")
+ final Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ super.onReceiveCommand(message);
+ if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ if(cleaupCheckLatch.get() != null) {
+ cleaupCheckLatch.get().countDown();
+ }
+ }
+ }
+ };
+ }
+ };
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(
+ Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx1";
+
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
+ doReturn(Futures.immediateFuture(null)).when(cohort).abort();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready the tx.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Send the AbortTransaction message.
+
+ shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+ verify(cohort).abort();
+
+ // Verify the tx cohort is removed from queue at the cleanup check interval.
+
+ cleaupCheckLatch.set(new CountDownLatch(1));
+ assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
+ cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Now send CanCommitTransaction - should fail.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+
+ Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+ assertTrue("Failure type", failure instanceof IllegalStateException);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@Test
public void testCreateSnapshot() throws Exception {
testCreateSnapshot(true, "testCreateSnapshot");