import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
DataTreeCandidateTip getCandidate() {
return delegate.getCandidate();
}
+
+ @Override
+ DataTreeModification getDataTreeModification() {
+ return delegate.getDataTreeModification();
+ }
}
\ No newline at end of file
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
}
private void handleCommitTransaction(final CommitTransaction commit) {
- if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
- shardMBean.incrementFailedTransactionsCount();
+ if (isLeader()) {
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+ shardMBean.incrementFailedTransactionsCount();
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader == null) {
+ messageRetrySupport.addMessageToRetry(commit, getSender(),
+ "Could not commit transaction " + commit.getTransactionID());
+ } else {
+ LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
+ leader.forward(commit, getContext());
+ }
}
}
LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
- cohortEntry.commit();
+ try {
+ cohortEntry.commit();
+ } catch(ExecutionException e) {
+ // We may get a "store tree and candidate base differ" IllegalStateException from commit under
+ // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
+ // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
+ // applying it to the state. We then become the leader and a second tx is pre-committed and
+ // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
+ // candidate via applyState prior to the second tx. Since the second tx has already been
+ // pre-committed, when it gets here to commit it will get an IllegalStateException.
+
+ // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
+ // solution will be forthcoming.
+ if(e.getCause() instanceof IllegalStateException) {
+ LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(),
+ transactionID, e);
+ store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
+ } else {
+ throw e;
+ }
+ }
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+
+ if (isLeader()) {
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader == null) {
+ messageRetrySupport.addMessageToRetry(canCommit, getSender(),
+ "Could not canCommit transaction " + canCommit.getTransactionID());
+ } else {
+ LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
+ leader.forward(canCommit, getContext());
+ }
+ }
}
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
}
store.closeAllTransactionChains();
-
- commitCoordinator.abortPendingTransactions(
- "The transacton was aborted due to inflight leadership change.", this);
}
if(hasLeader && !isIsolatedLeader()) {
protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.incrementLeadershipChangeCount();
- if(hasLeader() && !isIsolatedLeader()) {
+ boolean hasLeader = hasLeader();
+ if(hasLeader && !isLeader()) {
+ // Another leader was elected. If we were the previous leader and had pending transactions, convert
+ // them to transaction messages and send to the new leader.
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ Collection<Object> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
+
+ if(!messagesToForward.isEmpty()) {
+ LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
+ messagesToForward.size(), leader);
+
+ for(Object message: messagesToForward) {
+ leader.tell(message, self());
+ }
+ }
+ } else {
+ commitCoordinator.abortPendingTransactions(
+ "The transacton was aborted due to inflight leadership change and the leader address isn't available.",
+ this);
+ }
+ }
+
+ if(hasLeader && !isIsolatedLeader()) {
messageRetrySupport.retryMessages();
}
}
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.slf4j.Logger;
/**
doCanCommit(currentCohortEntry);
} else {
if(log.isDebugEnabled()) {
- log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
- name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+ log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
+ queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
+ transactionID);
}
}
}
"Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
}
} else {
- // FIXME - use caller's version
cohortEntry.getReplySender().tell(
canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
return;
}
- List<CohortEntry> cohortEntries = new ArrayList<>();
+ List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+
+ log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
+
+ for(CohortEntry cohortEntry: cohortEntries) {
+ if(cohortEntry.getReplySender() != null) {
+ cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+ }
+ }
+ }
+ private List<CohortEntry> getAndClearPendingCohortEntries() {
+ List<CohortEntry> cohortEntries = new ArrayList<>();
if(currentCohortEntry != null) {
cohortEntries.add(currentCohortEntry);
+ cohortCache.remove(currentCohortEntry.getTransactionID());
currentCohortEntry = null;
}
- cohortEntries.addAll(queuedCohortEntries);
+ for(CohortEntry cohortEntry: queuedCohortEntries) {
+ cohortEntries.add(cohortEntry);
+ cohortCache.remove(cohortEntry.getTransactionID());
+ }
+
queuedCohortEntries.clear();
+ return cohortEntries;
+ }
+ Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+ if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Collection<Object> messages = new ArrayList<>();
+ List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
for(CohortEntry cohortEntry: cohortEntries) {
- if(cohortEntry.getReplySender() != null) {
- cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+ if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
+ continue;
+ }
+
+ final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+ cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
+ @Override
+ protected BatchedModifications getModifications() {
+ if(newModifications.isEmpty() ||
+ newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+ newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion(), ""));
+ }
+
+ return newModifications.getLast();
+ }
+ });
+
+ if(!newModifications.isEmpty()) {
+ BatchedModifications last = newModifications.getLast();
+ last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
+ last.setReady(true);
+ last.setTotalMessagesSent(newModifications.size());
+ messages.addAll(newModifications);
+
+ if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.CAN_COMMITTED) {
+ messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion()));
+ }
+
+ if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.PRE_COMMITTED) {
+ messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion()));
+ }
}
}
+
+ return messages;
}
/**
}
static class CohortEntry {
+ enum State {
+ PENDING,
+ CAN_COMMITTED,
+ PRE_COMMITTED,
+ COMMITTED,
+ ABORTED
+ }
+
private final String transactionID;
private ShardDataTreeCohort cohort;
private final ReadWriteShardDataTreeTransaction transaction;
private boolean doImmediateCommit;
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private int totalBatchedModificationsReceived;
- private boolean aborted;
+ private State state = State.PENDING;
private final short clientVersion;
CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
return clientVersion;
}
+ State getState() {
+ return state;
+ }
+
DataTreeCandidate getCandidate() {
return cohort.getCandidate();
}
+ DataTreeModification getDataTreeModification() {
+ return cohort.getDataTreeModification();
+ }
+
ReadWriteShardDataTreeTransaction getTransaction() {
return transaction;
}
}
boolean canCommit() throws InterruptedException, ExecutionException {
+ state = State.CAN_COMMITTED;
+
// We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
// about possibly accessing our state on a different thread outside of our dispatcher.
// TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
}
void preCommit() throws InterruptedException, ExecutionException {
+ state = State.PRE_COMMITTED;
cohort.preCommit().get();
}
void commit() throws InterruptedException, ExecutionException {
+ state = State.COMMITTED;
cohort.commit().get();
}
void abort() throws InterruptedException, ExecutionException {
- aborted = true;
+ state = State.ABORTED;
cohort.abort().get();
}
boolean isAborted() {
- return aborted;
+ return state == State.ABORTED;
}
@Override
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
public abstract class ShardDataTreeCohort {
ShardDataTreeCohort() {
}
abstract DataTreeCandidateTip getCandidate();
+ abstract DataTreeModification getDataTreeModification();
@VisibleForTesting
public abstract ListenableFuture<Boolean> canCommit();
@Override
public ListenableFuture<Boolean> canCommit() {
- DataTreeModification modification = dataTreeModification();
+ DataTreeModification modification = getDataTreeModification();
try {
dataTree.getDataTree().validate(modification);
LOG.trace("Transaction {} validated", transaction);
@Override
public ListenableFuture<Void> preCommit() {
try {
- candidate = dataTree.getDataTree().prepare(dataTreeModification());
+ candidate = dataTree.getDataTree().prepare(getDataTreeModification());
/*
* FIXME: this is the place where we should be interacting with persistence, specifically by invoking
* persist on the candidate (which gives us a Future).
}
}
- private DataTreeModification dataTreeModification() {
+ @Override
+ DataTreeModification getDataTreeModification() {
DataTreeModification dataTreeModification = transaction;
if(transaction instanceof PruningDataTreeModification){
dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification();
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
followerDatastoreContextBuilder.shardBatchedModificationCount(2);
leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
- initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
+ initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
// Do an initial write to get the primary shard info cached.
- DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
- writeTx1.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- followerTestKit.doCommit(writeTx1.ready());
+ DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
+ initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+ followerTestKit.doCommit(initialWriteTx.ready());
// Wait for the commit to be replicated to the follower.
}
});
- // Create and prepare wo and rw tx's.
+ // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
+ // the leader shard.
+
+ DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+ writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+ DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready();
+ ListenableFuture<Boolean> writeTx1CanCommit = writeTx1Cohort.canCommit();
+ writeTx1CanCommit.get(5, TimeUnit.SECONDS);
+
+ // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued
+ // in the leader shard.
- writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+ DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
LinkedList<MapEntryNode> cars = new LinkedList<>();
int carIndex = 1;
- for(; carIndex <= 5; carIndex++) {
+ cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+ carIndex++;
+ NormalizedNode<?, ?> people = PeopleModel.newPersonMapNode();
+ writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
+ DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
+
+ // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
+ // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
+ // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
+ // sent on ready.
+
+ DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
+ for(int i = 1; i <= 5; i++, carIndex++) {
cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
- writeTx1.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+ writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
}
- DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
+ // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
+ // message on ready.
+
+ DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
- writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+ writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
carIndex++;
+ // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
+ // leader shard on ready.
+
DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
- // Submit tx's and enable elections on the follower so it becomes the leader, at which point the
- // readied tx's should get forwarded from the previous leader.
+ // Submit all tx's - the messages should get queued for retry.
+
+ ListenableFuture<Boolean> writeTx2CanCommit = writeTx2Cohort.canCommit();
+ DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready();
+ DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready();
+ DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready();
- DOMStoreThreePhaseCommitCohort cohort1 = writeTx1.ready();
- DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
- DOMStoreThreePhaseCommitCohort cohort3 = readWriteTx.ready();
+ // Enable elections on the other follower so it becomes the leader, at which point the
+ // tx's should get forwarded from the previous leader to the new leader to complete the commits.
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
- followerTestKit.doCommit(cohort1);
- followerTestKit.doCommit(cohort2);
- followerTestKit.doCommit(cohort3);
+ followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
+ followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
+ followerTestKit.doCommit(writeTx3Cohort);
+ followerTestKit.doCommit(writeTx4Cohort);
+ followerTestKit.doCommit(rwTxCohort);
- verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), cars.toArray(new MapEntryNode[cars.size()]));
+ DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
+ verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
+ verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
}
@Test
final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+ doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
final String transactionID2 = "tx2";