import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.SortedSet;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod,
Exception failure);
- abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+ abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod,
+ Optional<SortedSet<String>> participatingShardNames);
@Override
public String toString() {
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
public State getState() {
return delegate.getState();
}
+
+ @Override
+ Optional<SortedSet<String>> getParticipatingShardNames() {
+ return delegate.getParticipatingShardNames();
+ }
}
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
cohort.abort(callback);
}
- void ready(final CohortDecorator cohortDecorator) {
+ void ready(final Optional<SortedSet<String>> participatingShardNames, final CohortDecorator cohortDecorator) {
Preconditions.checkState(cohort == null, "cohort was already set");
- cohort = transaction.ready();
+ cohort = transaction.ready(participatingShardNames);
if (cohortDecorator != null) {
// Call the hook for unit tests.
}
}
+ Optional<SortedSet<String>> getParticipatingShardNames() {
+ return cohort != null ? cohort.getParticipatingShardNames() : Optional.empty();
+ }
+
boolean isDoImmediateCommit() {
return doImmediateCommit;
}
short HELIUM_2_VERSION = 2;
short LITHIUM_VERSION = 3;
short BORON_VERSION = 5;
- short CURRENT_VERSION = BORON_VERSION;
+ short FLUORINE_VERSION = 9;
+ short CURRENT_VERSION = FLUORINE_VERSION;
}
if (optFailure.isPresent()) {
state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
} else {
- state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification));
+ state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification,
+ java.util.Optional.empty()));
}
if (request.isCoordinated()) {
}
applyModifications(modifications);
- state = new Ready(checkOpen().ready());
+ state = new Ready(checkOpen().ready(java.util.Optional.empty()));
LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
}
import com.google.common.primitives.UnsignedLong;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
}
@Override
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
- return chain.createReadyCohort(id, mod);
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod,
+ final Optional<SortedSet<String>> participatingShardNames) {
+ return chain.createReadyCohort(id, mod, participatingShardNames);
}
}
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
this.modification = null;
}
- private Future<Object> initiateCommit(final boolean immediate) {
+ private Future<Object> initiateCommit(final boolean immediate,
+ final Optional<SortedSet<String>> participatingShardNames) {
if (operationError != null) {
return Futures.failed(operationError);
}
final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier(),
- modification, immediate);
+ modification, immediate, participatingShardNames);
return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
}
- Future<ActorSelection> initiateCoordinatedCommit() {
- final Future<Object> messageFuture = initiateCommit(false);
+ Future<ActorSelection> initiateCoordinatedCommit(Optional<SortedSet<String>> participatingShardNames) {
+ final Future<Object> messageFuture = initiateCommit(false, participatingShardNames);
final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
transaction.getIdentifier());
ret.onComplete(new OnComplete<ActorSelection>() {
}
Future<Object> initiateDirectCommit() {
- final Future<Object> messageFuture = initiateCommit(true);
+ final Future<Object> messageFuture = initiateCommit(true, Optional.empty());
messageFuture.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(final Throwable failure, final Object message) throws Throwable {
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
}
@Override
- public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+ final Optional<SortedSet<String>> participatingShardNames) {
final LocalThreePhaseCommitCohort cohort = ready();
- return cohort.initiateCoordinatedCommit();
+ return cohort.initiateCoordinatedCommit(participatingShardNames);
}
@Override
import akka.actor.ActorSelection;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
}
@Override
- public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+ final Optional<SortedSet<String>> participatingShardNamess) {
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
return akka.dispatch.Futures.failed(failure);
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
super(parent, id, modification);
}
- ShardDataTreeCohort ready() {
+ ShardDataTreeCohort ready(Optional<SortedSet<String>> participatingShardNames) {
Preconditions.checkState(close(), "Transaction is already closed");
- return getParent().finishTransaction(this);
+ return getParent().finishTransaction(this, participatingShardNames);
}
}
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
// Send the remaining batched modifications, if any, with the ready flag set.
bumpPermits(havePermit);
- return sendBatchedModifications(true, true);
+ return sendBatchedModifications(true, true, Optional.empty());
}
@Override
- public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+ final Optional<SortedSet<String>> participatingShardNames) {
logModificationCount();
LOG.debug("Tx {} readyTransaction called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
bumpPermits(havePermit);
- Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
return transformReadyReply(lastModificationsFuture);
}
}
protected Future<Object> sendBatchedModifications() {
- return sendBatchedModifications(false, false);
+ return sendBatchedModifications(false, false, Optional.empty());
}
- protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) {
+ protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
+ final Optional<SortedSet<String>> participatingShardNames) {
Future<Object> sent = null;
if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
if (batchedModifications == null) {
LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
batchedModifications.getModifications().size(), ready);
- batchedModifications.setReady(ready);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
batchPermits = 0;
if (ready) {
+ batchedModifications.setReady(participatingShardNames);
+ batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications = null;
} else {
batchedModifications = newBatchedModifications();
/// The name of this shard
private final String name;
+ private final String shardName;
+
private final ShardStats shardMBean;
private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
this.name = builder.getId().toString();
+ this.shardName = builder.getId().getShardName();
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
this.frontendMetadata = new FrontendMetadata(name);
return roleChangeNotifier;
}
+ String getShardName() {
+ return shardName;
+ }
+
@Override
protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
final short leaderPayloadVersion) {
LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
- forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+ forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
+ forwardedReady.getParticipatingShardNames());
readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(readyLocal, getContext());
}
messagesToForward.size(), leader);
for (Object message : messagesToForward) {
+ LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
leader.tell(message, self());
}
}
log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionId(), ready.getTxnClientVersion());
- final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+ final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
}
cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
- cohortEntry.ready(cohortDecorator);
+ cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
if (batched.isDoCommitOnReady()) {
cohortEntry.setReplySender(sender);
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
final TransactionIdentifier txId = message.getTransactionId();
- final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
+ final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+ message.getParticipatingShardNames());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
BatchedModifications last = newModifications.getLast();
last.setDoCommitOnReady(from.isDoCommitOnReady());
- last.setReady(from.isReady());
+ if (from.isReady()) {
+ last.setReady(from.getParticipatingShardNames());
+ }
last.setTotalMessagesSent(newModifications.size());
return newModifications;
}
if (last != null) {
final boolean immediate = cohortEntry.isDoImmediateCommit();
last.setDoCommitOnReady(immediate);
- last.setReady(true);
+ last.setReady(cohortEntry.getParticipatingShardNames());
last.setTotalMessagesSent(newMessages.size());
messages.addAll(newMessages);
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.SortedSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
this.cohort = Preconditions.checkNotNull(cohort);
lastAccess = now;
}
+
+ @Override
+ public String toString() {
+ return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+ }
}
private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
- private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
+ private final Deque<CommitEntry> pendingTransactions = new ArrayDeque<>();
private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
}
@Override
- ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+ ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+ final java.util.Optional<SortedSet<String>> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
- return createReadyCohort(transaction.getIdentifier(), snapshot);
+ return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
}
void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
return;
}
if (!cohort.equals(head.cohort)) {
- LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
- return;
+ // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this
+ // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx
+ // has other participating shards, it could deadlock with other tx's accessing the same shards
+ // depending on the order the tx's are readied on each shard
+ // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating
+ // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx
+ // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock
+ // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since
+ // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of
+ // the queue as a result, then proceed with canCommit.
+
+ Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
+ if (precedingShardNames.isEmpty()) {
+ LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}",
+ logContext, cohort.getIdentifier(), precedingShardNames);
+ final Iterator<CommitEntry> iter = pendingTransactions.iterator();
+ int index = -1;
+ int moveToIndex = -1;
+ while (iter.hasNext()) {
+ final CommitEntry entry = iter.next();
+ ++index;
+
+ if (cohort.equals(entry.cohort)) {
+ if (moveToIndex < 0) {
+ LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
+ logContext, cohort.getIdentifier());
+ return;
+ }
+
+ LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
+ logContext, cohort.getIdentifier(), moveToIndex);
+ iter.remove();
+ insertEntry(pendingTransactions, entry, moveToIndex);
+
+ if (!cohort.equals(pendingTransactions.peek().cohort)) {
+ LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
+ logContext, cohort.getIdentifier());
+ return;
+ }
+
+ LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
+ logContext, cohort.getIdentifier());
+ break;
+ }
+
+ if (entry.cohort.getState() != State.READY) {
+ LOG.debug("{}: Skipping pending transaction {} in state {}",
+ logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+ continue;
+ }
+
+ final Collection<String> pendingPrecedingShardNames = extractPrecedingShardNames(
+ entry.cohort.getParticipatingShardNames());
+
+ if (precedingShardNames.equals(pendingPrecedingShardNames)) {
+ if (moveToIndex < 0) {
+ LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
+ logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+ moveToIndex = index;
+ } else {
+ LOG.debug(
+ "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
+ logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+ }
+ } else {
+ LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
+ logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+ }
+ }
}
processNextPendingTransaction();
}
+ private void insertEntry(Deque<CommitEntry> queue, CommitEntry entry, int atIndex) {
+ if (atIndex == 0) {
+ queue.addFirst(entry);
+ return;
+ }
+
+ LOG.trace("Inserting into Deque at index {}", atIndex);
+
+ Deque<CommitEntry> tempStack = new ArrayDeque<>(atIndex);
+ for (int i = 0; i < atIndex; i++) {
+ tempStack.push(queue.poll());
+ }
+
+ queue.addFirst(entry);
+
+ tempStack.forEach(queue::addFirst);
+ }
+
+ private Collection<String> extractPrecedingShardNames(
+ java.util.Optional<SortedSet<String>> participatingShardNames) {
+ return participatingShardNames.map((Function<SortedSet<String>, Collection<String>>)
+ set -> set.headSet(shard.getShardName())).orElse(Collections.<String>emptyList());
+ }
+
private void failPreCommit(final Throwable cause) {
shard.getShardMBean().incrementFailedTransactionsCount();
pendingTransactions.poll().cohort.failedPreCommit(cause);
}
@Override
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final java.util.Optional<SortedSet<String>> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
- COMMIT_STEP_TIMEOUT));
+ COMMIT_STEP_TIMEOUT), participatingShardNames);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;
}
// Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
// the newReadWriteTransaction()
- ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+ ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final java.util.Optional<SortedSet<String>> participatingShardNames) {
if (txId.getHistoryId().getHistoryId() == 0) {
- return createReadyCohort(txId, mod);
+ return createReadyCohort(txId, mod, participatingShardNames);
}
- return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
+ return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames);
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
abstract DataTreeModification getDataTreeModification();
+ abstract Optional<SortedSet<String>> getParticipatingShardNames();
+
// FIXME: Should return rebased DataTreeCandidateTip
@VisibleForTesting
public abstract void canCommit(FutureCallback<Void> callback);
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import java.util.Optional;
+import java.util.SortedSet;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
}
@Override
- ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+ ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+ final Optional<SortedSet<String>> participatingShardNames) {
Preconditions.checkState(openTransaction != null,
"Attempted to finish transaction %s while none is outstanding", transaction);
// dataTree is finalizing ready the transaction, we just record it for the next
// transaction in chain
- final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction);
+ final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction, participatingShardNames);
openTransaction = null;
previousTx = transaction;
LOG.debug("Committing transaction {}", transaction);
}
@Override
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
- return dataTree.createReadyCohort(txId, mod);
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final Optional<SortedSet<String>> participatingShardNames) {
+ return dataTree.createReadyCohort(txId, mod, participatingShardNames);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
- abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+ abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction,
+ Optional<SortedSet<String>> participatingShardNames);
- abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod);
+ abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod,
+ Optional<SortedSet<String>> participatingShardNames);
abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier txId, DataTreeModification mod,
Exception failure);
totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
}
- readyTransaction(false, batched.isDoCommitOnReady(), batched.getVersion());
+ readyTransaction(batched);
} else {
getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
}
}
}
- private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit, short clientTxVersion) {
+ private void readyTransaction(BatchedModifications batched) {
TransactionIdentifier transactionID = getTransactionId();
LOG.debug("readyTransaction : {}", transactionID);
- getShardActor().forward(new ForwardedReadyTransaction(transactionID, clientTxVersion,
- transaction, doImmediateCommit), getContext());
+ getShardActor().forward(new ForwardedReadyTransaction(transactionID, batched.getVersion(),
+ transaction, batched.isDoCommitOnReady(), batched.getParticipatingShardNames()), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
getSelf().tell(PoisonPill.getInstance(), getSelf());
*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
import java.util.Optional;
+import java.util.SortedSet;
import java.util.concurrent.CompletionStage;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
private final ShardDataTree dataTree;
private final TransactionIdentifier transactionId;
private final CompositeDataTreeCohort userCohorts;
+ @Nullable
+ private final SortedSet<String> participatingShardNames;
private State state = State.READY;
private DataTreeCandidateTip candidate;
private Exception nextFailure;
SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
- final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
- this.dataTree = Preconditions.checkNotNull(dataTree);
- this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionId = Preconditions.checkNotNull(transactionId);
- this.userCohorts = Preconditions.checkNotNull(userCohorts);
+ final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts,
+ final Optional<SortedSet<String>> participatingShardNames) {
+ this.dataTree = requireNonNull(dataTree);
+ this.transaction = requireNonNull(transaction);
+ this.transactionId = requireNonNull(transactionId);
+ this.userCohorts = requireNonNull(userCohorts);
+ this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
}
SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
final TransactionIdentifier transactionId, final Exception nextFailure) {
- this.dataTree = Preconditions.checkNotNull(dataTree);
- this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionId = Preconditions.checkNotNull(transactionId);
+ this.dataTree = requireNonNull(dataTree);
+ this.transaction = requireNonNull(transaction);
+ this.transactionId = requireNonNull(transactionId);
this.userCohorts = null;
- this.nextFailure = Preconditions.checkNotNull(nextFailure);
+ this.participatingShardNames = null;
+ this.nextFailure = requireNonNull(nextFailure);
}
@Override
return transaction;
}
+ @Override
+ Optional<SortedSet<String>> getParticipatingShardNames() {
+ return Optional.ofNullable(participatingShardNames);
+ }
+
private void checkState(final State expected) {
- Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
+ Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s",
+ state, expected, getIdentifier());
}
@Override
}
checkState(State.READY);
- this.callback = Preconditions.checkNotNull(newCallback);
+ this.callback = requireNonNull(newCallback);
state = State.CAN_COMMIT_PENDING;
if (nextFailure == null) {
@Override
public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
checkState(State.CAN_COMMIT_COMPLETE);
- this.callback = Preconditions.checkNotNull(newCallback);
+ this.callback = requireNonNull(newCallback);
state = State.PRE_COMMIT_PENDING;
if (nextFailure == null) {
@Override
public void commit(final FutureCallback<UnsignedLong> newCallback) {
checkState(State.PRE_COMMIT_COMPLETE);
- this.callback = Preconditions.checkNotNull(newCallback);
+ this.callback = requireNonNull(newCallback);
state = State.COMMIT_PENDING;
if (nextFailure == null) {
void reportFailure(final Exception cause) {
if (nextFailure == null) {
- this.nextFailure = Preconditions.checkNotNull(cause);
+ this.nextFailure = requireNonNull(cause);
} else {
LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause);
}
import com.google.common.primitives.UnsignedLong;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
}
@Override
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
- return tree.createReadyCohort(id, mod);
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod,
+ final Optional<SortedSet<String>> participatingShardNames) {
+ return tree.createReadyCohort(id, mod, participatingShardNames);
}
}
for (CohortInfo cohort : cohorts) {
Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
- LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
actorContext.getTransactionCommitOperationTimeout()));
import akka.actor.ActorSelection;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import scala.concurrent.Future;
interface TransactionContext {
void closeTransaction();
- Future<ActorSelection> readyTransaction(Boolean havePermit);
+ Future<ActorSelection> readyTransaction(Boolean havePermit, Optional<SortedSet<String>> participatingShardNames);
void executeModification(AbstractModification modification, Boolean havePermit);
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedSet;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
}
}
- Future<ActorSelection> readyTransaction() {
+ Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
// avoid the creation of a promise and a TransactionOperation
final TransactionContext localContext = transactionContext;
if (localContext != null) {
- return localContext.readyTransaction(null);
+ return localContext.readyTransaction(null, participatingShardNames);
}
final Promise<ActorSelection> promise = Futures.promise();
enqueueTransactionOperation(new TransactionOperation() {
@Override
public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
- promise.completeWith(newTransactionContext.readyTransaction(havePermit));
+ promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
}
});
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.SortedSet;
import java.util.TreeMap;
+import java.util.TreeSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
- // Global lock used for transactions spanning multiple shards - synchronizes sending of the ready messages
- // for atomicity to avoid potential deadlock with concurrent transactions spanning the same shards as outlined
- // in the following scenario:
- //
- // - Tx1 sends ready message to shard A
- // - Tx2 sends ready message to shard A
- // - Tx2 sends ready message to shard B
- // - Tx1 sends ready message to shard B
- //
- // This scenario results in deadlock: after Tx1 canCommits to shard A, it can't proceed with shard B until Tx2
- // completes as Tx2 was readied first on shard B. However Tx2 cannot make progress because it's waiting to canCommit
- // on shard A which is blocked by Tx1.
- //
- // The global lock avoids this as it forces the ready messages to be sent in a predictable order:
- //
- // - Tx1 sends ready message to shard A
- // - Tx1 sends ready message to shard B
- // - Tx2 sends ready message to shard A
- // - Tx2 sends ready message to shard B
- //
- private static final Object GLOBAL_TX_READY_LOCK = new Object();
-
private final Map<String, TransactionContextWrapper> txContextWrappers = new TreeMap<>();
private final AbstractTransactionContextFactory<?> txContextFactory;
private final TransactionType type;
ret = createSingleCommitCohort(e.getKey(), e.getValue());
break;
default:
- ret = createMultiCommitCohort(txContextWrappers.entrySet());
+ ret = createMultiCommitCohort();
}
txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
return transactionContext.directCommit(havePermit);
}
- private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
- final Set<Entry<String, TransactionContextWrapper>> txContextWrapperEntries) {
-
- final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrapperEntries.size());
+ private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
- synchronized (GLOBAL_TX_READY_LOCK) {
- for (Entry<String, TransactionContextWrapper> e : txContextWrapperEntries) {
- LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
+ final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrappers.size());
+ final java.util.Optional<SortedSet<String>> shardNames =
+ java.util.Optional.of(new TreeSet<>(txContextWrappers.keySet()));
+ for (Entry<String, TransactionContextWrapper> e : txContextWrappers.entrySet()) {
+ LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
- final TransactionContextWrapper wrapper = e.getValue();
+ final TransactionContextWrapper wrapper = e.getValue();
- // The remote tx version is obtained the via TransactionContext which may not be available yet so
- // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
- // TransactionContext is available.
- Supplier<Short> txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion();
+ // The remote tx version is obtained the via TransactionContext which may not be available yet so
+ // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
+ // TransactionContext is available.
+ Supplier<Short> txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion();
- cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier));
- }
+ cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames),
+ txVersionSupplier));
}
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, getIdentifier());
BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(),
toPrune.getVersion());
prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady());
- prunedModifications.setReady(toPrune.isReady());
+ if (toPrune.isReady()) {
+ prunedModifications.setReady(toPrune.getParticipatingShardNames());
+ }
prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent());
for (Modification mod: toPrune.getModifications()) {
if (canForwardModificationToNewLeader(mod)) {
BatchedModifications modifications = new BatchedModifications(
new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
modifications.setDoCommitOnReady(true);
- modifications.setReady(true);
+ modifications.setReady();
modifications.setTotalMessagesSent(1);
return modifications;
}
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Optional;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
/**
private boolean doCommitOnReady;
private int totalMessagesSent;
private TransactionIdentifier transactionId;
+ @Nullable
+ private SortedSet<String> participatingShardNames;
public BatchedModifications() {
}
public BatchedModifications(TransactionIdentifier transactionId, short version) {
super(version);
- this.transactionId = Preconditions.checkNotNull(transactionId, "transactionID can't be null");
+ this.transactionId = requireNonNull(transactionId, "transactionID can't be null");
}
public boolean isReady() {
return ready;
}
- public void setReady(boolean ready) {
- this.ready = ready;
+ public void setReady(Optional<SortedSet<String>> possibleParticipatingShardNames) {
+ this.ready = true;
+ this.participatingShardNames = requireNonNull(possibleParticipatingShardNames).orElse(null);
+ Preconditions.checkArgument(this.participatingShardNames == null || this.participatingShardNames.size() > 1);
+ }
+
+ public void setReady() {
+ setReady(Optional.empty());
+ }
+
+ public Optional<SortedSet<String>> getParticipatingShardNames() {
+ return Optional.ofNullable(participatingShardNames);
}
public boolean isDoCommitOnReady() {
return transactionId;
}
-
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
ready = in.readBoolean();
totalMessagesSent = in.readInt();
doCommitOnReady = in.readBoolean();
+
+ if (getVersion() >= DataStoreVersions.FLUORINE_VERSION) {
+ final int count = in.readInt();
+ if (count != 0) {
+ SortedSet<String> shardNames = new TreeSet<>();
+ for (int i = 0; i < count; i++) {
+ shardNames.add((String) in.readObject());
+ }
+
+ participatingShardNames = shardNames;
+ }
+ }
}
@Override
out.writeBoolean(ready);
out.writeInt(totalMessagesSent);
out.writeBoolean(doCommitOnReady);
+
+ if (getVersion() >= DataStoreVersions.FLUORINE_VERSION) {
+ if (participatingShardNames != null) {
+ out.writeInt(participatingShardNames.size());
+ for (String shardName: participatingShardNames) {
+ out.writeObject(shardName);
+ }
+ } else {
+ out.writeInt(0);
+ }
+ }
}
@Override
public String toString() {
return "BatchedModifications [transactionId=" + transactionId
- + ", ready=" + ready
+ + ", ready=" + isReady()
+ + ", participatingShardNames=" + participatingShardNames
+ ", totalMessagesSent=" + totalMessagesSent
+ ", modifications size=" + getModifications().size() + "]";
}
*/
package org.opendaylight.controller.cluster.datastore.messages;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.SortedSet;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ReadWriteShardDataTreeTransaction;
private final ReadWriteShardDataTreeTransaction transaction;
private final boolean doImmediateCommit;
private final short txnClientVersion;
+ @Nullable
+ private final SortedSet<String> participatingShardNames;
public ForwardedReadyTransaction(TransactionIdentifier transactionId, short txnClientVersion,
- ReadWriteShardDataTreeTransaction transaction, boolean doImmediateCommit) {
- this.transactionId = Preconditions.checkNotNull(transactionId);
- this.transaction = Preconditions.checkNotNull(transaction);
+ ReadWriteShardDataTreeTransaction transaction, boolean doImmediateCommit,
+ Optional<SortedSet<String>> participatingShardNames) {
+ this.transactionId = requireNonNull(transactionId);
+ this.transaction = requireNonNull(transaction);
this.txnClientVersion = txnClientVersion;
this.doImmediateCommit = doImmediateCommit;
+ this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
}
public TransactionIdentifier getTransactionId() {
return doImmediateCommit;
}
+ public Optional<SortedSet<String>> getParticipatingShardNames() {
+ return Optional.ofNullable(participatingShardNames);
+ }
+
@Override
public String toString() {
- return "ForwardedReadyTransaction [transactionId=" + transactionId + ", doImmediateCommit=" + doImmediateCommit
+ return "ForwardedReadyTransaction [transactionId=" + transactionId + ", transaction=" + transaction
+ + ", doImmediateCommit=" + doImmediateCommit + ", participatingShardNames=" + participatingShardNames
+ ", txnClientVersion=" + txnClientVersion + "]";
}
}
*/
package org.opendaylight.controller.cluster.datastore.messages;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.SortedSet;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
private final DataTreeModification modification;
private final TransactionIdentifier transactionId;
private final boolean doCommitOnReady;
+ @Nullable
+ private final SortedSet<String> participatingShardNames;
// The version of the remote system used only when needing to convert to BatchedModifications.
private short remoteVersion = DataStoreVersions.CURRENT_VERSION;
public ReadyLocalTransaction(final TransactionIdentifier transactionId, final DataTreeModification modification,
- final boolean doCommitOnReady) {
- this.transactionId = Preconditions.checkNotNull(transactionId);
- this.modification = Preconditions.checkNotNull(modification);
+ final boolean doCommitOnReady, Optional<SortedSet<String>> participatingShardNames) {
+ this.transactionId = requireNonNull(transactionId);
+ this.modification = requireNonNull(modification);
this.doCommitOnReady = doCommitOnReady;
+ this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
}
public TransactionIdentifier getTransactionId() {
public void setRemoteVersion(short remoteVersion) {
this.remoteVersion = remoteVersion;
}
+
+ public Optional<SortedSet<String>> getParticipatingShardNames() {
+ return Optional.ofNullable(participatingShardNames);
+ }
}
readyLocal.getRemoteVersion());
batched.setDoCommitOnReady(readyLocal.isDoCommitOnReady());
batched.setTotalMessagesSent(1);
- batched.setReady(true);
+ batched.setReady(readyLocal.getParticipatingShardNames());
readyLocal.getModification().applyToCursor(new BatchedCursor(batched));
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
final boolean doCommitOnReady) {
final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION);
batchedModifications.addModification(modification);
- batchedModifications.setReady(true);
+ batchedModifications.setReady();
batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(1);
return batchedModifications;
ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore()
.newReadWriteTransaction(transactionID);
rwTx.getSnapshot().write(path, data);
- return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady);
+ return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady, Optional.empty());
}
public static NormalizedNode<?,?> readStore(final TestActorRef<? extends Shard> shard,
final NormalizedNode<?,?> node) throws DataValidationFailedException {
final BatchedModifications batched = new BatchedModifications(nextTransactionId(), CURRENT_VERSION);
batched.addModification(new MergeModification(id, node));
- batched.setReady(true);
+ batched.setReady();
batched.setDoCommitOnReady(true);
batched.setTotalMessagesSent(1);
final boolean doCommitOnReady, final int messagesSent) {
final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
batched.addModification(new WriteModification(path, data));
- batched.setReady(ready);
+ if (ready) {
+ batched.setReady();
+ }
batched.setDoCommitOnReady(doCommitOnReady);
batched.setTotalMessagesSent(messagesSent);
return batched;
}
+ static BatchedModifications newReadyBatchedModifications(final TransactionIdentifier transactionID,
+ final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final SortedSet<String> participatingShardNames) {
+ final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
+ batched.addModification(new WriteModification(path, data));
+ batched.setReady(Optional.of(participatingShardNames));
+ batched.setTotalMessagesSent(1);
+ return batched;
+ }
+
@SuppressWarnings("unchecked")
static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
public State getState() {
return delegate.getState();
}
+
+ @Override
+ Optional<SortedSet<String>> getParticipatingShardNames() {
+ return delegate.getParticipatingShardNames();
+ }
}
}
private final Configuration configuration = new MockConfiguration() {
Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
- "junk", new ShardStrategy() {
+ TestModel.JUNK_QNAME.getLocalName(), new ShardStrategy() {
@Override
public String findShard(final YangInstanceIdentifier path) {
- return "junk";
+ return TestModel.JUNK_QNAME.getLocalName();
}
@Override
return YangInstanceIdentifier.EMPTY;
}
}).put(
- "cars", new ShardStrategy() {
+ CarsModel.BASE_QNAME.getLocalName(), new ShardStrategy() {
@Override
public String findShard(final YangInstanceIdentifier path) {
- return "cars";
+ return CarsModel.BASE_QNAME.getLocalName();
}
@Override
@Override
public String getModuleNameFromNameSpace(final String nameSpace) {
if (TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
- return "junk";
+ return TestModel.JUNK_QNAME.getLocalName();
} else if (CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
- return "cars";
+ return CarsModel.BASE_QNAME.getLocalName();
}
return null;
}
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
modification.ready();
- ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true);
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true,
+ java.util.Optional.empty());
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
modification.ready();
- readyLocal = new ReadyLocalTransaction(tx2 , modification, false);
+ readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty());
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true);
+ Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
+ java.util.Optional.empty());
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
forwardedReady = new ForwardedReadyTransaction(tx2,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false);
+ Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
+ java.util.Optional.empty());
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
+import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
shardTransaction = new ReadWriteShardDataTreeTransaction(mockParent, TX_ID, mockModification);
openTx = FrontendReadWriteTransaction.createOpen(mockHistory, shardTransaction);
- when(mockParent.finishTransaction(same(shardTransaction))).thenReturn(mockCohort);
+ when(mockParent.finishTransaction(same(shardTransaction), eq(Optional.empty()))).thenReturn(mockCohort);
}
private TransactionSuccess<?> handleRequest(final TransactionRequest<?> request) throws RequestException {
final TransactionRequest<?> readyReq = b.build();
assertNotNull(handleRequest(readyReq));
- verify(mockParent).finishTransaction(same(shardTransaction));
+ verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
assertNotNull(handleRequest(readyReq));
verifyNoMoreInteractions(mockParent);
final TransactionRequest<?> readyReq = b.build();
assertNull(handleRequest(readyReq));
- verify(mockParent).finishTransaction(same(shardTransaction));
+ verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
assertNull(handleRequest(readyReq));
verifyNoMoreInteractions(mockParent);
final TransactionRequest<?> readyReq = b.build();
assertNull(handleRequest(readyReq));
- verify(mockParent).finishTransaction(same(shardTransaction));
+ verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
assertNull(handleRequest(readyReq));
verifyNoMoreInteractions(mockParent);
final TransactionRequest<?> readyReq = b.build();
assertNotNull(handleRequest(readyReq));
- verify(mockParent).finishTransaction(same(shardTransaction));
+ verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
handleRequest(new ReadTransactionRequest(TX_ID, 0, mock(ActorRef.class), YangInstanceIdentifier.EMPTY, true));
}
final TransactionRequest<?> readyReq = b.build();
assertNotNull(handleRequest(readyReq));
- verify(mockParent).finishTransaction(same(shardTransaction));
+ verify(mockParent).finishTransaction(same(shardTransaction), eq(Optional.empty()));
b.setSequence(1);
b.addModification(mock(TransactionModification.class));
@Test
public void testReady() {
final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
- doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+ doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(
+ java.util.Optional.empty());
doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
- Future<ActorSelection> future = localTransactionContext.readyTransaction(null);
+ Future<ActorSelection> future = localTransactionContext.readyTransaction(null, java.util.Optional.empty());
assertTrue(future.isCompleted());
verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
private void doReadyWithExpectedError(final RuntimeException expError) {
LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
- doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+ doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(
+ java.util.Optional.empty());
doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
- localTransactionContext.readyTransaction(null);
+ localTransactionContext.readyTransaction(null, java.util.Optional.empty());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertNotNull;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.ID_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerMapNode;
+import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
+
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.collect.ImmutableSortedSet;
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for various 3PC coordination scenarios.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardCommitCoordinationTest extends AbstractShardTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class);
+
+ /**
+ * Test 2 tx's accessing the same shards.
+ * <pre>
+ * tx1 -> shard A, shard B
+ * tx2 -> shard A, shard B
+ * </pre>
+ * The tx's are readied such the pendingTransactions queue are as follows:
+ * <pre>
+ * Queue for shard A -> tx1, tx2
+ * Queue for shard B -> tx2, tx1
+ * </pre>
+ * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B
+ * even though it isn't at the head of the queues.
+ */
+ @Test
+ public void testTwoTransactionsWithSameTwoParticipatingShards() throws Exception {
+ final String testName = "testTwoTransactionsWithSameTwoParticipatingShards";
+ LOG.info("{} starting", testName);
+
+ final TestKit kit1 = new TestKit(getSystem());
+ final TestKit kit2 = new TestKit(getSystem());
+
+ final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
+ final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
+
+ final TestActorRef<Shard> shardA = actorFactory.createTestActor(
+ newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardA);
+
+ final TestActorRef<Shard> shardB = actorFactory.createTestActor(
+ newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardB);
+
+ final TransactionIdentifier txId1 = nextTransactionId();
+ final TransactionIdentifier txId2 = nextTransactionId();
+
+ SortedSet<String> participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(),
+ shardBId.getShardName());
+
+ // Ready [tx1, tx2] on shard A.
+
+ shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
+ kit1.expectMsgClass(ReadyTransactionReply.class);
+
+ shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
+ participatingShardNames), kit2.getRef());
+ kit2.expectMsgClass(ReadyTransactionReply.class);
+
+ // Ready [tx2, tx1] on shard B.
+
+ shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
+ participatingShardNames), kit2.getRef());
+ kit2.expectMsgClass(ReadyTransactionReply.class);
+
+ shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
+ kit1.expectMsgClass(ReadyTransactionReply.class);
+
+ // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard
+ // in the participating shard list.
+
+ shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+ // Send tx1 CanCommit to A - it's at the head of the queue so should proceed.
+
+ shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
+ // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed.
+
+ shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Send tx2 CanCommit to B - tx1 should now be at the head of he queue.
+
+ shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+ // Finish commit of tx1.
+
+ shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CommitTransactionReply.class);
+
+ shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CommitTransactionReply.class);
+
+ // Finish commit of tx2.
+
+ kit2.expectMsgClass(CanCommitTransactionReply.class);
+ kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+ shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CommitTransactionReply.class);
+
+ shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CommitTransactionReply.class);
+
+ // Verify data in the data store.
+
+ verifyOuterListEntry(shardA, 1);
+ verifyOuterListEntry(shardB, 1);
+
+ LOG.info("{} ending", testName);
+ }
+
+ /**
+ * Test multiple tx's accessing a mix of same and differing shards.
+ * <pre>
+ * tx1 -> shard X, shard B
+ * tx2 -> shard X, shard B
+ * tx3 -> shard A, shard B
+ * tx4 -> shard A, shard B
+ * tx5 -> shard A, shard B
+ * </pre>
+ * The tx's are readied such the pendingTransactions queue are as follows:
+ * <pre>
+ * Queue for shard A -> tx3, tx4, tx5
+ * Queue for shard B -> tx1, tx2, tx5, tx4, tx3
+ * </pre>
+ * Note: shard X means any other shard which isn't relevant for the test.
+ * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when
+ * CanCommit is requested.
+ */
+ @Test
+ public void testMultipleTransactionsWithMixedParticipatingShards() throws Exception {
+ final String testName = "testMultipleTransactionsWithMixedParticipatingShards";
+ LOG.info("{} starting", testName);
+
+ final TestKit kit1 = new TestKit(getSystem());
+ final TestKit kit2 = new TestKit(getSystem());
+ final TestKit kit3 = new TestKit(getSystem());
+ final TestKit kit4 = new TestKit(getSystem());
+ final TestKit kit5 = new TestKit(getSystem());
+
+ final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
+ final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
+
+ final TestActorRef<Shard> shardA = actorFactory.createTestActor(
+ newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardA);
+
+ final TestActorRef<Shard> shardB = actorFactory.createTestActor(
+ newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardB);
+
+ final TransactionIdentifier txId1 = nextTransactionId();
+ final TransactionIdentifier txId2 = nextTransactionId();
+ final TransactionIdentifier txId3 = nextTransactionId();
+ final TransactionIdentifier txId4 = nextTransactionId();
+ final TransactionIdentifier txId5 = nextTransactionId();
+
+ final SortedSet<String> participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(),
+ shardBId.getShardName());
+ final SortedSet<String> participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName());
+
+ // Ready [tx3, tx4, tx5] on shard A.
+
+ shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit3.getRef());
+ kit3.expectMsgClass(ReadyTransactionReply.class);
+
+ shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, outerMapNode(),
+ participatingShardNames1), kit4.getRef());
+ kit4.expectMsgClass(ReadyTransactionReply.class);
+
+ shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1),
+ ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit5.getRef());
+ kit5.expectMsgClass(ReadyTransactionReply.class);
+
+ // Ready [tx1, tx2, tx5, tx4, tx3] on shard B.
+
+ shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit1.getRef());
+ kit1.expectMsgClass(ReadyTransactionReply.class);
+
+ shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerMapNode(),
+ participatingShardNames2), kit2.getRef());
+ kit2.expectMsgClass(ReadyTransactionReply.class);
+
+ shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"),
+ ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef());
+ kit5.expectMsgClass(ReadyTransactionReply.class);
+
+ shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(),
+ participatingShardNames1), kit4.getRef());
+ kit4.expectMsgClass(ReadyTransactionReply.class);
+
+ shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1),
+ ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit3.getRef());
+ kit3.expectMsgClass(ReadyTransactionReply.class);
+
+ // Send tx3 CanCommit to A - it's at the head of the queue so should proceed.
+
+ shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
+ kit3.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Send tx1 CanCommit to B - it's at the head of the queue so should proceed.
+
+ shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating
+ // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue.
+
+ shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
+ kit3.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+ // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should
+ // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state.
+
+ shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
+ kit4.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+ // Send tx5 CanCommit to B - it's position in the queue should remain the same.
+
+ shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
+ kit5.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+ // Finish commit of tx1.
+
+ shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CommitTransactionReply.class);
+
+ // Finish commit of tx2.
+
+ shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+ shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CommitTransactionReply.class);
+
+ // Finish commit of tx3.
+
+ // From shard B
+ kit3.expectMsgClass(CanCommitTransactionReply.class);
+
+ shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
+ kit3.expectMsgClass(CommitTransactionReply.class);
+
+ shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
+ kit3.expectMsgClass(CommitTransactionReply.class);
+
+ // Finish commit of tx4.
+
+ // From shard B
+ kit4.expectMsgClass(CanCommitTransactionReply.class);
+
+ shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
+ kit4.expectMsgClass(CanCommitTransactionReply.class);
+ shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
+ kit4.expectMsgClass(CommitTransactionReply.class);
+
+ shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
+ kit4.expectMsgClass(CommitTransactionReply.class);
+
+ // Finish commit of tx5.
+
+ // From shard B
+ kit5.expectMsgClass(CanCommitTransactionReply.class);
+
+ shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
+ kit5.expectMsgClass(CanCommitTransactionReply.class);
+ shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
+ kit5.expectMsgClass(CommitTransactionReply.class);
+
+ shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
+ kit5.expectMsgClass(CommitTransactionReply.class);
+
+ verifyOuterListEntry(shardA, 1);
+ verifyInnerListEntry(shardB, 1, "one");
+
+ LOG.info("{} ending", testName);
+ }
+
+ /**
+ * Test 2 tx's accessing 2 shards, the second in common.
+ * <pre>
+ * tx1 -> shard A, shard C
+ * tx2 -> shard B, shard C
+ * </pre>
+ * The tx's are readied such the pendingTransactions queue are as follows:
+ * <pre>
+ * Queue for shard A -> tx1
+ * Queue for shard B -> tx2
+ * Queue for shard C -> tx2, tx1
+ * </pre>
+ * When the tx's re committed verify the ready order is preserved.
+ */
+ @Test
+ public void testTwoTransactionsWithOneCommonParticipatingShard1() throws Exception {
+ final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1";
+ LOG.info("{} starting", testName);
+
+ final TestKit kit1 = new TestKit(getSystem());
+ final TestKit kit2 = new TestKit(getSystem());
+
+ final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
+ final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
+ final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
+
+ final TestActorRef<Shard> shardA = actorFactory.createTestActor(
+ newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardA);
+
+ final TestActorRef<Shard> shardB = actorFactory.createTestActor(
+ newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardB);
+
+ final TestActorRef<Shard> shardC = actorFactory.createTestActor(
+ newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardC);
+
+ final TransactionIdentifier txId1 = nextTransactionId();
+ final TransactionIdentifier txId2 = nextTransactionId();
+
+ SortedSet<String> participatingShardNames1 =
+ ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName());
+ SortedSet<String> participatingShardNames2 =
+ ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
+
+ // Ready [tx1] on shard A.
+
+ shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
+ kit1.expectMsgClass(ReadyTransactionReply.class);
+
+ // Ready [tx2] on shard B.
+
+ shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
+ kit2.expectMsgClass(ReadyTransactionReply.class);
+
+ // Ready [tx2, tx1] on shard C.
+
+ shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
+ kit2.expectMsgClass(ReadyTransactionReply.class);
+
+ shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
+ participatingShardNames1), kit1.getRef());
+ kit1.expectMsgClass(ReadyTransactionReply.class);
+
+ // Send tx1 CanCommit to A - should succeed.
+
+ shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Send tx2 CanCommit to B - should succeed.
+
+ shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating
+ // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed.
+
+ shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+ // Send tx2 CanCommit to C - it's at the head of the queue so should proceed.
+
+ shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Finish commit of tx2.
+
+ shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CommitTransactionReply.class);
+
+ shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CommitTransactionReply.class);
+
+ // Finish commit of tx1.
+
+ kit1.expectMsgClass(CanCommitTransactionReply.class);
+ shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CommitTransactionReply.class);
+
+ shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CommitTransactionReply.class);
+
+ // Verify data in the data store.
+
+ verifyOuterListEntry(shardC, 1);
+
+ LOG.info("{} ending", testName);
+ }
+
+ /**
+ * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common.
+ * <pre>
+ * tx1 -> shard A, shard B
+ * tx2 -> shard B, shard C
+ * </pre>
+ * The tx's are readied such the pendingTransactions queue are as follows:
+ * <pre>
+ * Queue for shard A -> tx1
+ * Queue for shard B -> tx2, tx1
+ * Queue for shard C -> tx2
+ * </pre>
+ * When the tx's re committed verify the ready order is preserved.
+ */
+ @Test
+ public void testTwoTransactionsWithOneCommonParticipatingShard2() throws Exception {
+ final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2";
+ LOG.info("{} starting", testName);
+
+ final TestKit kit1 = new TestKit(getSystem());
+ final TestKit kit2 = new TestKit(getSystem());
+
+ final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
+ final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
+ final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
+
+ final TestActorRef<Shard> shardA = actorFactory.createTestActor(
+ newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardA);
+
+ final TestActorRef<Shard> shardB = actorFactory.createTestActor(
+ newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardB);
+
+ final TestActorRef<Shard> shardC = actorFactory.createTestActor(
+ newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
+ ShardTestKit.waitUntilLeader(shardC);
+
+ final TransactionIdentifier txId1 = nextTransactionId();
+ final TransactionIdentifier txId2 = nextTransactionId();
+
+ SortedSet<String> participatingShardNames1 =
+ ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName());
+ SortedSet<String> participatingShardNames2 =
+ ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
+
+ // Ready [tx1] on shard A.
+
+ shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
+ kit1.expectMsgClass(ReadyTransactionReply.class);
+
+ // Ready [tx2, tx1] on shard B.
+
+ shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
+ kit2.expectMsgClass(ReadyTransactionReply.class);
+
+ shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
+ participatingShardNames1), kit1.getRef());
+ kit1.expectMsgClass(ReadyTransactionReply.class);
+
+ // Ready [tx2] on shard C.
+
+ shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
+ ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
+ kit2.expectMsgClass(ReadyTransactionReply.class);
+
+ // Send tx1 CanCommit to A - should succeed.
+
+ shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
+ // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed.
+
+ shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
+
+ // Send tx2 CanCommit to B - it's at the head of the queue so should proceed.
+
+ shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+ // Finish commit of tx2.
+
+ shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CanCommitTransactionReply.class);
+
+ shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CommitTransactionReply.class);
+
+ shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
+ kit2.expectMsgClass(CommitTransactionReply.class);
+
+ // Finish commit of tx1.
+
+ kit1.expectMsgClass(CanCommitTransactionReply.class);
+ shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CommitTransactionReply.class);
+
+ shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
+ kit1.expectMsgClass(CommitTransactionReply.class);
+
+ // Verify data in the data store.
+
+ verifyOuterListEntry(shardB, 1);
+
+ LOG.info("{} ending", testName);
+ }
+
+ static void verifyInnerListEntry(TestActorRef<Shard> shard, int outerID, String innerID)
+ throws Exception {
+ final YangInstanceIdentifier path = innerEntryPath(outerID, innerID);
+ final NormalizedNode<?, ?> innerListEntry = readStore(shard, path);
+ assertNotNull(path + " not found", innerListEntry);
+ }
+}
snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
}
- final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+ final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
immediateCanCommit(cohort);
immediatePreCommit(cohort);
shardDataTree.newReadWriteTransaction(nextTransactionId());
final DataTreeModification snapshot = transaction.getSnapshot();
operation.execute(snapshot);
- return shardDataTree.finishTransaction(transaction);
+ return shardDataTree.finishTransaction(transaction, Optional.empty());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
shardDataTree.newReadWriteTransaction(nextTransactionId());
final DataTreeModification snapshot = transaction.getSnapshot();
operation.execute(snapshot);
- final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+ final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
immediateCanCommit(cohort);
immediatePreCommit(cohort);
for (final DataTreeCandidate candidateTip : candidates) {
DataTreeCandidates.applyToModification(snapshot, candidateTip);
}
- final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+ final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
immediateCanCommit(cohort);
immediatePreCommit(cohort);
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
final TransactionIdentifier transactionID = nextTransactionId();
final BatchedModifications batched = new BatchedModifications(transactionID,
DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
shard.tell(batched, getRef());
final Throwable cause = failure.cause();
batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
shard.tell(batched, getRef());
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
- shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
+ shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
+ getRef());
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
}
final TransactionIdentifier txId = nextTransactionId();
modification.ready();
- final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+ final ReadyLocalTransaction readyMessage =
+ new ReadyLocalTransaction(txId, modification, true, Optional.empty());
shard.tell(readyMessage, getRef());
final TransactionIdentifier txId = nextTransactionId();
modification.ready();
- final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+ final ReadyLocalTransaction readyMessage =
+ new ReadyLocalTransaction(txId, modification, false, Optional.empty());
shard.tell(readyMessage, getRef());
.apply(modification3);
modification3.ready();
final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
- true);
+ true, Optional.empty());
shard.tell(readyMessage, getRef());
// Commit the first Tx. After completing, the second should
assertEquals("getNumBatched", 1, reply.getNumBatched());
batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
transaction.tell(batched, getRef());
BatchedModifications batched = new BatchedModifications(nextTransactionId(),
DataStoreVersions.CURRENT_VERSION);
batched.addModification(new WriteModification(writePath, writeData));
- batched.setReady(true);
+ batched.setReady();
batched.setDoCommitOnReady(true);
batched.setTotalMessagesSent(1);
expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
transaction.tell(batched, getRef());
BatchedModifications batched = new BatchedModifications(nextTransactionId(),
DataStoreVersions.CURRENT_VERSION);
- batched.setReady(true);
+ batched.setReady();
batched.setTotalMessagesSent(2);
transaction.tell(batched, getRef());
doReturn(Optional.empty()).when(mockUserCohorts).abort();
cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
- mockUserCohorts);
+ mockUserCohorts, Optional.empty());
}
@Test
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collection;
import java.util.List;
+import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
public void testReadyWithMultipleShardWrites() throws Exception {
ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
- ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
+ ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
+ TestModel.JUNK_QNAME.getLocalName());
expectBatchedModificationsReady(actorRef1);
expectBatchedModificationsReady(actorRef2);
+ ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
+ .actorSelection(actorRef3.path().toString());
+
+ doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
+ .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
+
+ expectReadyLocalTransaction(actorRef3, false);
+
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
- actorSelection(actorRef2));
+ actorSelection(actorRef2), actorSelection(actorRef3));
+
+ SortedSet<String> expShardNames =
+ ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
+ TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
+
+ ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
+ verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
+ assertEquals("Participating shards present", true,
+ batchedMods.getValue().getParticipatingShardNames().isPresent());
+ assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
+
+ batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
+ verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
+ assertEquals("Participating shards present", true,
+ batchedMods.getValue().getParticipatingShardNames().isPresent());
+ assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
+
+ ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
+ verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
+ assertEquals("Participating shards present", true,
+ readyLocalTx.getValue().getParticipatingShardNames().isPresent());
+ assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
}
@Test
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof SingleCommitCohortProxy);
verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
+
+ ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
+ verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
+ assertEquals("Participating shards present", false,
+ readyLocalTx.getValue().getParticipatingShardNames().isPresent());
}
@Test
dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
- ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
+ ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
+ TestModel.JUNK_QNAME.getLocalName());
doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
import static org.junit.Assert.assertEquals;
+import com.google.common.collect.ImmutableSortedSet;
import java.io.Serializable;
+import java.util.Optional;
+import java.util.SortedSet;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
- batched.setReady(true);
+ assertEquals("isReady", false, batched.isReady());
+ batched.setReady();
+ assertEquals("isReady", true, batched.isReady());
batched.setTotalMessagesSent(5);
BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
assertEquals("getTransactionID", tx1, clone.getTransactionId());
assertEquals("isReady", true, clone.isReady());
+ assertEquals("isDoCommitOnReady", false, clone.isDoCommitOnReady());
+ assertEquals("participatingShardNames present", false, clone.getParticipatingShardNames().isPresent());
assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
assertEquals("getModifications size", 3, clone.getModifications().size());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
assertEquals("getPath", deletePath, delete.getPath());
- // Test with different params.
+ // Test with participating shard names.
+
final TransactionIdentifier tx2 = nextTransactionId();
batched = new BatchedModifications(tx2, (short)10000);
+ final SortedSet<String> shardNames = ImmutableSortedSet.of("one", "two");
+ batched.setReady(Optional.of(shardNames));
+ batched.setDoCommitOnReady(true);
+ assertEquals("isReady", true, batched.isReady());
clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
assertEquals("getTransactionID", tx2, clone.getTransactionId());
- assertEquals("isReady", false, clone.isReady());
+ assertEquals("isReady", true, clone.isReady());
+ assertEquals("isDoCommitOnReady", true, clone.isDoCommitOnReady());
+ assertEquals("participatingShardNames present", true, clone.getParticipatingShardNames().isPresent());
+ assertEquals("participatingShardNames", shardNames, clone.getParticipatingShardNames().get());
+ assertEquals("getModifications size", 0, clone.getModifications().size());
+
+ // Test not ready.
+ batched = new BatchedModifications(tx2, DataStoreVersions.CURRENT_VERSION);
+
+ clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
+
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+ assertEquals("getTransactionID", tx2, clone.getTransactionId());
+ assertEquals("isReady", false, clone.isReady());
assertEquals("getModifications size", 0, clone.getModifications().size());
+ // Test pre-Flourine
+
+ batched = new BatchedModifications(tx2, DataStoreVersions.BORON_VERSION);
+ batched.addModification(new WriteModification(writePath, writeData));
+ batched.setReady(Optional.of(ImmutableSortedSet.of("one", "two")));
+
+ clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
+
+ assertEquals("getVersion", DataStoreVersions.BORON_VERSION, clone.getVersion());
+ assertEquals("getTransactionID", tx2, clone.getTransactionId());
+ assertEquals("isReady", true, clone.isReady());
+ assertEquals("participatingShardNames present", false, clone.getParticipatingShardNames().isPresent());
+ assertEquals("getModifications size", 1, clone.getModifications().size());
}
@Test
import akka.actor.ExtendedActorSystem;
import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableSortedSet;
import java.io.NotSerializableException;
import java.util.List;
+import java.util.Optional;
+import java.util.SortedSet;
import org.junit.Test;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.AbstractTest;
MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+ final SortedSet<String> shardNames = ImmutableSortedSet.of("one", "two");
TransactionIdentifier txId = nextTransactionId();
- ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+ ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true,
+ Optional.of(shardNames));
final ExtendedActorSystem system = (ExtendedActorSystem) ExtendedActorSystem.create("test");
final Object deserialized;
BatchedModifications batched = (BatchedModifications)deserialized;
assertEquals("getTransactionID", txId, batched.getTransactionId());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, batched.getVersion());
+ assertEquals("isReady", true, batched.isReady());
+ assertEquals("isDoCommitOnReady", true, batched.isDoCommitOnReady());
+ assertEquals("participatingShardNames present", true, batched.getParticipatingShardNames().isPresent());
+ assertEquals("participatingShardNames", shardNames, batched.getParticipatingShardNames().get());
List<Modification> batchedMods = batched.getModifications();
assertEquals("getModifications size", 2, batchedMods.size());
return YangParserTestUtils.parseYangResource(DATASTORE_TEST_YANG);
}
+ public static DataContainerChild<?, ?> outerMapNode() {
+ return ImmutableNodes.mapNodeBuilder(OUTER_LIST_QNAME).build();
+ }
+
public static DataContainerChild<?, ?> outerNode(final int... ids) {
CollectionNodeBuilder<MapEntryNode, MapNode> outer = ImmutableNodes.mapNodeBuilder(OUTER_LIST_QNAME);
for (int id: ids) {
public static YangInstanceIdentifier innerEntryPath(final int id, final String name) {
return OUTER_LIST_PATH.node(outerEntryKey(id)).node(INNER_LIST_QNAME).node(innerEntryKey(name));
}
+
+ public static YangInstanceIdentifier innerMapPath(final int id) {
+ return OUTER_LIST_PATH.node(outerEntryKey(id)).node(INNER_LIST_QNAME);
+ }
}
loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
actor {
+ warn-about-java-serializer-usage = false
}
}
serialization-bindings {
"org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
}
+
+ warn-about-java-serializer-usage = false
}
remote {
log-remote-lifecycle-events = off
serialization-bindings {
"org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
}
+
+ warn-about-java-serializer-usage = false
}
remote {
log-remote-lifecycle-events = off
serialization-bindings {
"org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
}
+
+ warn-about-java-serializer-usage = false
}
remote {
log-remote-lifecycle-events = off
serialization-bindings {
"org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
}
+
+ warn-about-java-serializer-usage = false
}
remote {
log-remote-lifecycle-events = off
serialization-bindings {
"org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
}
+
+ warn-about-java-serializer-usage = false
}
remote {
log-remote-lifecycle-events = off
serialization-bindings {
"org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
}
+
+ warn-about-java-serializer-usage = false
}
remote {
log-remote-lifecycle-events = off
serialization-bindings {
"org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
}
+
+ warn-about-java-serializer-usage = false
}
remote {
log-remote-lifecycle-events = off