*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSelection;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import scala.concurrent.Future;
* implementation. In addition to the usual set of methods it also contains the list of actor
* futures.
*/
-abstract class AbstractThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
- abstract List<Future<ActorSelection>> getCohortFutures();
+public abstract class AbstractThreePhaseCommitCohort<T> implements DOMStoreThreePhaseCommitCohort {
+ protected static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
+ protected static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS = Futures.immediateFuture(Boolean.TRUE);
+
+ abstract List<Future<T>> getCohortFutures();
}
/**
* Stores the ready Futures from the previous Tx in the chain.
*/
- private final List<Future<ActorSelection>> previousReadyFutures;
+ private final List<Future<Object>> previousReadyFutures;
/**
* Stores the ready Futures from this transaction when it is readied.
*/
- private volatile List<Future<ActorSelection>> readyFutures;
+ private volatile List<Future<Object>> readyFutures;
ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
- String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
+ String transactionChainId, List<Future<Object>> previousReadyFutures) {
super(actorContext, transactionType, transactionChainId);
this.previousReadyFutures = previousReadyFutures;
}
- List<Future<ActorSelection>> getReadyFutures() {
+ List<Future<Object>> getReadyFutures() {
return readyFutures;
}
return readyFutures != null;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public AbstractThreePhaseCommitCohort ready() {
- final AbstractThreePhaseCommitCohort ret = super.ready();
- readyFutures = ret.getCohortFutures();
+ public AbstractThreePhaseCommitCohort<?> ready() {
+ final AbstractThreePhaseCommitCohort<?> ret = super.ready();
+ readyFutures = (List)ret.getCohortFutures();
LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
readyFutures.size(), getTransactionChainId());
return ret;
}
// Combine the ready Futures into 1.
- Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
previousReadyFutures, getActorContext().getClientDispatcher());
// Add a callback for completion of the combined Futures.
final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
- OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
- public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
+ public void onComplete(Throwable failure, Iterable<Object> notUsed) {
if(failure != null) {
// A Ready Future failed so fail the returned Promise.
returnPromise.failure(failure);
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
- private boolean writeOnlyTransactionOptimizationsEnabled = false;
+ private boolean writeOnlyTransactionOptimizationsEnabled = true;
public static Set<String> getGlobalDatastoreTypes() {
return globalDatastoreTypes;
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
import java.util.List;
* A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
* instance given out for empty transactions.
*/
-final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort {
+final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort<Object> {
static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
- private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
- private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS = Futures.immediateFuture(Boolean.TRUE);
-
private NoOpDOMStoreThreePhaseCommitCohort() {
// Hidden to prevent instantiation
}
}
@Override
- List<Future<ActorSelection>> getCohortFutures() {
+ List<Future<Object>> getCohortFutures() {
return Collections.emptyList();
}
}
LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
}
+ @Override
+ public boolean supportsDirectCommit() {
+ return true;
+ }
+
+ @Override
+ public Future<Object> directCommit() {
+ LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
+ operationLimiter.release();
+ return akka.dispatch.Futures.failed(failure);
+ }
+
@Override
public Future<ActorSelection> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called", getIdentifier());
+ LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
operationLimiter.release();
return akka.dispatch.Futures.failed(failure);
}
package org.opendaylight.controller.cluster.datastore;
-public interface OperationCallback {
+import java.util.concurrent.atomic.AtomicReference;
+
+interface OperationCallback {
+ OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+ @Override
+ public void run() {
+ }
+
+ @Override
+ public void success() {
+ }
+
+ @Override
+ public void failure() {
+ }
+ };
+
+ class Reference extends AtomicReference<OperationCallback> {
+ private static final long serialVersionUID = 1L;
+
+ public Reference(OperationCallback initialValue) {
+ super(initialValue);
+ }
+ }
+
void run();
void success();
void failure();
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
-import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
private final MessageTracker appendEntriesReplyTracker;
- private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
- Serialization.serializedActorPath(getSelf()));
-
private final DOMTransactionFactory domTransactionFactory;
private final ShardTransactionActorFactory transactionActorFactory;
} else if (BatchedModifications.class.isInstance(message)) {
handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+ commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
+ getSender(), this);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
} else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
}
}
- private void handleCommitTransaction(final CommitTransaction commit) {
- final String transactionID = commit.getTransactionID();
-
- LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
-
- // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
- // this transaction.
- final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if(cohortEntry == null) {
- // We're not the current Tx - the Tx was likely expired b/c it took too long in
- // between the canCommit and commit messages.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: Cannot commit transaction %s - it is not the current transaction",
- persistenceId(), transactionID));
- LOG.error(ex.getMessage());
- shardMBean.incrementFailedTransactionsCount();
- getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
- return;
+ void continueCommit(final CohortEntry cohortEntry) throws Exception {
+ // If we do not have any followers and we are not using persistence
+ // or if cohortEntry has no modifications
+ // we can apply modification to the state immediately
+ if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
+ applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification());
+ } else {
+ Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+ new ModificationPayload(cohortEntry.getModification()));
}
+ }
- // We perform the preCommit phase here atomically with the commit phase. This is an
- // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
- // coordination of preCommit across shards in case of failure but preCommit should not
- // normally fail since we ensure only one concurrent 3-phase commit.
-
- try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- cohortEntry.getCohort().preCommit().get();
-
- // If we do not have any followers and we are not using persistence
- // or if cohortEntry has no modifications
- // we can apply modification to the state immediately
- if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
- applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
- } else {
- Shard.this.persistData(getSender(), transactionID,
- new ModificationPayload(cohortEntry.getModification()));
- }
- } catch (Exception e) {
- LOG.error("{} An exception occurred while preCommitting transaction {}",
- persistenceId(), cohortEntry.getTransactionID(), e);
+ private void handleCommitTransaction(final CommitTransaction commit) {
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
shardMBean.incrementFailedTransactionsCount();
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
-
- cohortEntry.updateLastAccessTime();
}
private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
- commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
private void handleBatchedModifications(BatchedModifications batched) {
//
if(isLeader()) {
try {
- boolean ready = commitCoordinator.handleTransactionModifications(batched);
- if(ready) {
- sender().tell(READY_TRANSACTION_REPLY, self());
- } else {
- sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
- }
+ commitCoordinator.handleBatchedModifications(batched, getSender(), this);
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
}
}
- private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
- ready.getTransactionID(), ready.getTxnClientVersion());
-
- // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
- // commitCoordinator in preparation for the subsequent three phase commit initiated by
- // the front-end.
- commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
- (MutableCompositeModification) ready.getModification());
-
- // Return our actor path as we'll handle the three phase commit, except if the Tx client
- // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
- // node. In that case, the subsequent 3-phase commit messages won't contain the
- // transactionId so to maintain backwards compatibility, we create a separate cohort actor
- // to provide the compatible behavior.
- if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
- ActorRef replyActorPath = getSelf();
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
- }
-
- ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
- ready.getTxnClientVersion());
- getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
- } else {
- getSender().tell(READY_TRANSACTION_REPLY, getSelf());
- }
- }
-
private void handleAbortTransaction(final AbortTransaction abort) {
doAbortTransaction(abort.getTransactionID(), getSender());
}
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.slf4j.Logger;
private final String name;
- private final String shardActorPath;
-
private final RemovalListener<String, CohortEntry> cacheRemovalListener =
new RemovalListener<String, CohortEntry>() {
@Override
// This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
private CohortDecorator cohortDecorator;
+ private ReadyTransactionReply readyTransactionReply;
+
public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
this.name = name;
this.transactionFactory = transactionFactory;
- shardActorPath = Serialization.serializedActorPath(shardActor);
-
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
removalListener(cacheRemovalListener).build();
this.queueCapacity = queueCapacity;
}
+ private ReadyTransactionReply readyTransactionReply(Shard shard) {
+ if(readyTransactionReply == null) {
+ readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
+ }
+
+ return readyTransactionReply;
+ }
+
/**
* This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
* the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
- *
- * @param transactionID the ID of the transaction
- * @param cohort the cohort to participate in the transaction commit
- * @param modification the modifications made by the transaction
*/
- public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- MutableCompositeModification modification) {
+ public void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+ log.debug("{}: Readying transaction {}, client version {}", name,
+ ready.getTransactionID(), ready.getTxnClientVersion());
+
+ CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
+ (MutableCompositeModification) ready.getModification());
+ cohortCache.put(ready.getTransactionID(), cohortEntry);
+
+ if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+ // Return our actor path as we'll handle the three phase commit except if the Tx client
+ // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
+ // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
+ // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
+ ActorRef replyActorPath = shard.self();
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
+ replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
- cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
+ ReadyTransactionReply readyTransactionReply =
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+ ready.getTxnClientVersion());
+ sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, shard.self());
+ } else {
+ if(ready.isDoImmediateCommit()) {
+ cohortEntry.setDoImmediateCommit(true);
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
+ // front-end so send back a ReadyTransactionReply with our actor path.
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ }
}
/**
*
* @throws ExecutionException if an error occurs loading the cache
*/
- public boolean handleTransactionModifications(BatchedModifications batched)
+ boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
throws ExecutionException {
CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
if(cohortEntry == null) {
batched.getTransactionID(), batched.getVersion());
}
- cohortEntry.ready(cohortDecorator);
+ cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
+
+ if(batched.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ } else {
+ sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
}
return batched.isReady();
}
- /**
- * This method handles the canCommit phase for a transaction.
- *
- * @param canCommit the CanCommitTransaction message
- * @param sender the actor that sent the message
- * @param shard the transaction's shard actor
- */
- public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
- final ActorRef shard) {
- String transactionID = canCommit.getTransactionID();
+ private void handleCanCommit(CohortEntry cohortEntry) {
+ String transactionID = cohortEntry.getTransactionID();
+
if(log.isDebugEnabled()) {
log.debug("{}: Processing canCommit for transaction {} for shard {}",
- name, transactionID, shard.path());
- }
-
- // Lookup the cohort entry that was cached previously (or should have been) by
- // transactionReady (via the ForwardedReadyTransaction message).
- final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
- if(cohortEntry == null) {
- // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
- // between canCommit and ready and the entry was expired from the cache.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: No cohort entry found for transaction %s", name, transactionID));
- log.error(ex.getMessage());
- sender.tell(new Status.Failure(ex), shard);
- return;
+ name, transactionID, cohortEntry.getShard().self().path());
}
- cohortEntry.setCanCommitSender(sender);
- cohortEntry.setShard(shard);
-
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
" capacity %d has been reached.",
name, transactionID, queueCapacity));
log.error(ex.getMessage());
- sender.tell(new Status.Failure(ex), shard);
+ cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self());
}
} else {
// No Tx commit currently in progress - make this the current entry and proceed with
}
}
+ /**
+ * This method handles the canCommit phase for a transaction.
+ *
+ * @param canCommit the CanCommitTransaction message
+ * @param sender the actor that sent the message
+ * @param shard the transaction's shard actor
+ */
+ public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+ // Lookup the cohort entry that was cached previously (or should have been) by
+ // transactionReady (via the ForwardedReadyTransaction message).
+ final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
+ if(cohortEntry == null) {
+ // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
+ // between canCommit and ready and the entry was expired from the cache.
+ IllegalStateException ex = new IllegalStateException(
+ String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+ log.error(ex.getMessage());
+ sender.tell(new Status.Failure(ex), shard.self());
+ return;
+ }
+
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+
+ handleCanCommit(cohortEntry);
+ }
+
private void doCanCommit(final CohortEntry cohortEntry) {
+ boolean canCommit = false;
try {
// We block on the future here so we don't have to worry about possibly accessing our
// state on a different thread outside of our dispatcher. Also, the data store
// currently uses a same thread executor anyway.
- Boolean canCommit = cohortEntry.getCohort().canCommit().get();
+ canCommit = cohortEntry.getCohort().canCommit().get();
+
+ if(cohortEntry.isDoImmediateCommit()) {
+ if(canCommit) {
+ doCommit(cohortEntry);
+ } else {
+ cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
+ "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
+ }
+ } else {
+ cohortEntry.getReplySender().tell(
+ canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+ CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
+ }
+ } catch (Exception e) {
+ log.debug("{}: An exception occurred during canCommit: {}", name, e);
- cohortEntry.getCanCommitSender().tell(
- canCommit ? CanCommitTransactionReply.YES.toSerializable() :
- CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
+ Throwable failure = e;
+ if(e instanceof ExecutionException) {
+ failure = e.getCause();
+ }
+ cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
+ } finally {
if(!canCommit) {
- // Remove the entry from the cache now since the Tx will be aborted.
- removeCohortEntry(cohortEntry.getTransactionID());
+ // Remove the entry from the cache now.
+ currentTransactionComplete(cohortEntry.getTransactionID(), true);
}
- } catch (InterruptedException | ExecutionException e) {
- log.debug("{}: An exception occurred during canCommit: {}", name, e);
+ }
+ }
+
+ private boolean doCommit(CohortEntry cohortEntry) {
+ log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
- // Remove the entry from the cache now since the Tx will be aborted.
- removeCohortEntry(cohortEntry.getTransactionID());
- cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
+ boolean success = false;
+
+ // We perform the preCommit phase here atomically with the commit phase. This is an
+ // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
+ // coordination of preCommit across shards in case of failure but preCommit should not
+ // normally fail since we ensure only one concurrent 3-phase commit.
+
+ try {
+ // We block on the future here so we don't have to worry about possibly accessing our
+ // state on a different thread outside of our dispatcher. Also, the data store
+ // currently uses a same thread executor anyway.
+ cohortEntry.getCohort().preCommit().get();
+
+ cohortEntry.getShard().continueCommit(cohortEntry);
+
+ cohortEntry.updateLastAccessTime();
+
+ success = true;
+ } catch (Exception e) {
+ log.error("{} An exception occurred while preCommitting transaction {}",
+ name, cohortEntry.getTransactionID(), e);
+ cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
+
+ currentTransactionComplete(cohortEntry.getTransactionID(), true);
+ }
+
+ return success;
+ }
+
+ boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+ // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
+ // this transaction.
+ final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry == null) {
+ // We're not the current Tx - the Tx was likely expired b/c it took too long in
+ // between the canCommit and commit messages.
+ IllegalStateException ex = new IllegalStateException(
+ String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+ name, transactionID));
+ log.error(ex.getMessage());
+ sender.tell(new akka.actor.Status.Failure(ex), shard.self());
+ return false;
}
+
+ return doCommit(cohortEntry);
}
/**
private DOMStoreThreePhaseCommitCohort cohort;
private final MutableCompositeModification compositeModification;
private final DOMStoreWriteTransaction transaction;
- private ActorRef canCommitSender;
- private ActorRef shard;
+ private ActorRef replySender;
+ private Shard shard;
private long lastAccessTime;
+ private boolean doImmediateCommit;
CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
this.compositeModification = new MutableCompositeModification();
}
}
- void ready(CohortDecorator cohortDecorator) {
+ void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
Preconditions.checkState(cohort == null, "cohort was already set");
+ setDoImmediateCommit(doImmediateCommit);
+
cohort = transaction.ready();
if(cohortDecorator != null) {
}
}
- ActorRef getCanCommitSender() {
- return canCommitSender;
+ boolean isDoImmediateCommit() {
+ return doImmediateCommit;
+ }
+
+ void setDoImmediateCommit(boolean doImmediateCommit) {
+ this.doImmediateCommit = doImmediateCommit;
+ }
+
+ ActorRef getReplySender() {
+ return replySender;
}
- void setCanCommitSender(ActorRef canCommitSender) {
- this.canCommitSender = canCommitSender;
+ void setReplySender(ActorRef replySender) {
+ this.replySender = replySender;
}
- ActorRef getShard() {
+ Shard getShard() {
return shard;
}
- void setShard(ActorRef shard) {
+ void setShard(Shard shard) {
this.shard = shard;
}
if (message instanceof BatchedModifications) {
batchedModifications((BatchedModifications)message);
} else if (message instanceof ReadyTransaction) {
- readyTransaction(transaction, !SERIALIZED_REPLY);
+ readyTransaction(transaction, !SERIALIZED_REPLY, false);
} else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction, SERIALIZED_REPLY);
+ readyTransaction(transaction, SERIALIZED_REPLY, false);
} else if(WriteData.isSerializedType(message)) {
writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
}
- readyTransaction(transaction, false);
+ readyTransaction(transaction, false, batched.isDoCommitOnReady());
} else {
getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
}
}
}
- private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) {
+ private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized,
+ boolean doImmediateCommit) {
String transactionID = getTransactionID();
LOG.debug("readyTransaction : {}", transactionID);
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
- cohort, compositeModification, returnSerialized), getContext());
+ cohort, compositeModification, returnSerialized, doImmediateCommit), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
getSelf().tell(PoisonPill.getInstance(), getSelf());
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Arrays;
+import java.util.List;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
+ * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
+ * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a
+ * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions).
+ *
+ * @author Thomas Pantelis
+ */
+class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
+ private static final Logger LOG = LoggerFactory.getLogger(SingleCommitCohortProxy.class);
+
+ private final ActorContext actorContext;
+ private final Future<Object> cohortFuture;
+ private final String transactionId;
+ private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+ private final OperationCallback.Reference operationCallbackRef;
+
+ SingleCommitCohortProxy(ActorContext actorContext, Future<Object> cohortFuture, String transactionId,
+ OperationCallback.Reference operationCallbackRef) {
+ this.actorContext = actorContext;
+ this.cohortFuture = cohortFuture;
+ this.transactionId = transactionId;
+ this.operationCallbackRef = operationCallbackRef;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ LOG.debug("Tx {} canCommit", transactionId);
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ cohortFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object cohortResponse) {
+ if(failure != null) {
+ operationCallbackRef.get().failure();
+ returnFuture.setException(failure);
+ return;
+ }
+
+ operationCallbackRef.get().success();
+
+ if(cohortResponse instanceof ActorSelection) {
+ handlePreLithiumActorCohort((ActorSelection)cohortResponse, returnFuture);
+ return;
+ }
+
+ LOG.debug("Tx {} successfully completed direct commit", transactionId);
+
+ // The Future was the result of a direct commit to the shard, essentially eliding the
+ // front-end 3PC coordination. We don't really care about the specific Future
+ // response object, only that it completed successfully. At this point the Tx is complete
+ // so return true. The subsequent preCommit and commit phases will be no-ops, ie return
+ // immediate success, to complete the 3PC for the front-end.
+ returnFuture.set(Boolean.TRUE);
+ }
+ }, actorContext.getClientDispatcher());
+
+ return returnFuture;
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return delegateCohort.preCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegateCohort.abort();
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return delegateCohort.commit();
+ }
+
+ @Override
+ List<Future<Object>> getCohortFutures() {
+ return Arrays.asList(cohortFuture);
+ }
+
+ private void handlePreLithiumActorCohort(ActorSelection actorSelection, final SettableFuture<Boolean> returnFuture) {
+ // Handle backwards compatibility. An ActorSelection response would be returned from a
+ // pre-Lithium version. In this case delegate to a ThreePhaseCommitCohortProxy.
+ delegateCohort = new ThreePhaseCommitCohortProxy(actorContext,
+ Arrays.asList(Futures.successful(actorSelection)), transactionId);
+ com.google.common.util.concurrent.Futures.addCallback(delegateCohort.canCommit(), new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean canCommit) {
+ returnFuture.set(canCommit);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ returnFuture.setException(t);
+ }
+ });
+ }
+}
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
*/
-public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort {
+public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
- private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
- com.google.common.util.concurrent.Futures.immediateFuture(null);
-
private final ActorContext actorContext;
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
- private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
- @Override
- public void run() {
- }
-
- @Override
- public void success() {
- }
-
- @Override
- public void failure() {
- }
- };
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
public ListenableFuture<Void> preCommit() {
// We don't need to do anything here - preCommit is done atomically with the commit phase
// by the shard.
- return IMMEDIATE_SUCCESS;
+ return IMMEDIATE_VOID_SUCCESS;
}
@Override
@Override
public ListenableFuture<Void> commit() {
- OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK :
+ OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
new TransactionRateLimitingCallback(actorContext);
return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException) {
- return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
+ return voidOperation(operationName, message, expectedResponseClass, propagateException,
+ OperationCallback.NO_OP_CALLBACK);
}
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSelection;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
private interface State {
boolean isReady();
- List<Future<ActorSelection>> getPreviousReadyFutures();
+ List<Future<Object>> getPreviousReadyFutures();
}
private static class Allocated implements State {
}
@Override
- public List<Future<ActorSelection>> getPreviousReadyFutures() {
+ public List<Future<Object>> getPreviousReadyFutures() {
return transaction.getReadyFutures();
}
}
private static abstract class AbstractDefaultState implements State {
@Override
- public List<Future<ActorSelection>> getPreviousReadyFutures() {
+ public List<Future<Object>> getPreviousReadyFutures() {
return Collections.emptyList();
}
}
void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
+
+ boolean supportsDirectCommit();
+
+ Future<Object> directCommit();
}
actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
}
+ @Override
+ public boolean supportsDirectCommit() {
+ return true;
+ }
+
+ @Override
+ public Future<Object> directCommit() {
+ LOG.debug("Tx {} directCommit called", getIdentifier());
+
+ // Send the remaining batched modifications, if any, with the ready flag set.
+
+ return sendBatchedModifications(true, true);
+ }
+
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
- Future<Object> lastModificationsFuture = sendBatchedModifications(true);
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
return transformReadyReply(lastModificationsFuture);
}
}
protected Future<Object> sendBatchedModifications() {
- return sendBatchedModifications(false);
+ return sendBatchedModifications(false, false);
}
- protected Future<Object> sendBatchedModifications(boolean ready) {
+ protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
Future<Object> sent = null;
if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
if(batchedModifications == null) {
}
batchedModifications.setReady(ready);
+ batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
sent = executeOperationAsync(batchedModifications);
}
@Override
- public AbstractThreePhaseCommitCohort ready() {
+ public AbstractThreePhaseCommitCohort<?> ready() {
Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
"Read-only transactions cannot be readied");
throttleOperation(txFutureCallbackMap.size());
+ final boolean isSingleShard = txFutureCallbackMap.size() == 1;
+ return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
+ TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+
+ LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+ txFutureCallback.getShardName(), transactionChainId);
+
+ final OperationCallback.Reference operationCallbackRef =
+ new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
+ final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ final Future future;
+ if (transactionContext != null) {
+ // avoid the creation of a promise and a TransactionOperation
+ future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
+ } else {
+ final Promise promise = akka.dispatch.Futures.promise();
+ txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+ }
+ });
+ future = promise.future();
+ }
+
+ return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+ }
+
+ private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+ OperationCallback.Reference operationCallbackRef) {
+ if(transactionContext.supportsDirectCommit()) {
+ TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+ operationCallbackRef.set(rateLimitingCallback);
+ rateLimitingCallback.run();
+ return transactionContext.directCommit();
+ } else {
+ return transactionContext.readyTransaction();
+ }
+ }
+
+ private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
- LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
+ LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
txFutureCallback.getShardName(), transactionChainId);
final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
cohortFutures.add(future);
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
- getIdentifier().toString());
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
}
@Override
return readyTxReply.getCohortPath();
}
+
+ @Override
+ public boolean supportsDirectCommit() {
+ return false;
+ }
+
+ @Override
+ public Future<Object> directCommit() {
+ throw new UnsupportedOperationException("directCommit is not supported for " + getClass());
+ }
}
private static final long serialVersionUID = 1L;
private boolean ready;
+ private boolean doCommitOnReady;
private int totalMessagesSent;
private String transactionID;
private String transactionChainID;
this.ready = ready;
}
+ public boolean isDoCommitOnReady() {
+ return doCommitOnReady;
+ }
+
+ public void setDoCommitOnReady(boolean doCommitOnReady) {
+ this.doCommitOnReady = doCommitOnReady;
+ }
+
public int getTotalMessagesSent() {
return totalMessagesSent;
}
transactionChainID = in.readUTF();
ready = in.readBoolean();
totalMessagesSent = in.readInt();
+ doCommitOnReady = in.readBoolean();
}
@Override
out.writeUTF(transactionChainID);
out.writeBoolean(ready);
out.writeInt(totalMessagesSent);
+ out.writeBoolean(doCommitOnReady);
}
@Override
private final DOMStoreThreePhaseCommitCohort cohort;
private final Modification modification;
private final boolean returnSerialized;
+ private final boolean doImmediateCommit;
private final short txnClientVersion;
public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
DOMStoreThreePhaseCommitCohort cohort, Modification modification,
- boolean returnSerialized) {
+ boolean returnSerialized, boolean doImmediateCommit) {
this.transactionID = transactionID;
this.cohort = cohort;
this.modification = modification;
this.returnSerialized = returnSerialized;
this.txnClientVersion = txnClientVersion;
+ this.doImmediateCommit = doImmediateCommit;
}
public String getTransactionID() {
public short getTxnClientVersion() {
return txnClientVersion;
}
+
+ public boolean isDoImmediateCommit() {
+ return doImmediateCommit;
+ }
}
doAnswer(new Answer<ListenableFuture<Void>>() {
@Override
public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
- return actual.preCommit();
+ if(preCommit != null) {
+ return preCommit.apply(actual);
+ } else {
+ return actual.preCommit();
+ }
}
}).when(cohort).preCommit();
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.testkit.JavaTestKit;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.CheckedFuture;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Before;
import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
private static ActorSystem system;
- private final Configuration configuration = new MockConfiguration();
+ private final Configuration configuration = new MockConfiguration() {
+ @Override
+ public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+ return ImmutableMap.<String, ShardStrategy>builder().put(
+ "junk", new ShardStrategy() {
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
+ return "junk";
+ }
+ }).build();
+ }
+
+ @Override
+ public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
+ return TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace) ?
+ Optional.of("junk") : Optional.<String>absent();
+ }
+ };
@Mock
protected ActorContext mockActorContext;
doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+ Timer timer = new MetricRegistry().timer("test");
+ doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
+
ShardStrategyFactory.setConfiguration(configuration);
}
}
protected void expectBatchedModificationsReady(ActorRef actorRef) {
- doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+ expectBatchedModificationsReady(actorRef, false);
+ }
+
+ protected void expectBatchedModificationsReady(ActorRef actorRef, boolean doCommitOnReady) {
+ doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
+ readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
protected void expectBatchedModifications(int count) {
}
protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
+ return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
+ }
+
+ protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
log.info("Created mock shard actor {}", actorRef);
when(mockActorContext).actorSelection(actorRef.path().toString());
doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
- when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ when(mockActorContext).findPrimaryShardAsync(eq(shardName));
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
}
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
- TransactionType type, int transactionVersion) {
- ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
+ TransactionType type, int transactionVersion, String shardName) {
+ ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName);
return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
memberName, shardActorRef);
}
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
- return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
+ DefaultShardStrategy.DEFAULT_SHARD);
}
+ protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type,
+ String shardName) {
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
+ shardName);
+ }
protected void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
throws Throwable {
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected);
+ verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected);
}
protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
+ verifyBatchedModifications(message, expIsReady, false, expected);
+ }
+
+ protected void verifyBatchedModifications(Object message, boolean expIsReady, boolean expIsDoCommitOnReady,
+ Modification... expected) {
assertEquals("Message type", BatchedModifications.class, message.getClass());
BatchedModifications batchedModifications = (BatchedModifications)message;
assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
assertEquals("isReady", expIsReady, batchedModifications.isReady());
+ assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady());
for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
Modification actual = batchedModifications.getModifications().get(i);
assertEquals("Modification type", expected[i].getClass(), actual.getClass());
}
}
- protected void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
+ protected void verifyCohortFutures(AbstractThreePhaseCommitCohort<?> proxy,
Object... expReplies) throws Exception {
assertEquals("getReadyOperationFutures size", expReplies.length,
proxy.getCohortFutures().size());
- int i = 0;
- for( Future<ActorSelection> future: proxy.getCohortFutures()) {
+ List<Object> futureResults = new ArrayList<>();
+ for( Future<?> future: proxy.getCohortFutures()) {
assertNotNull("Ready operation Future is null", future);
+ try {
+ futureResults.add(Await.result(future, Duration.create(5, TimeUnit.SECONDS)));
+ } catch(Exception e) {
+ futureResults.add(e);
+ }
+ }
- Object expReply = expReplies[i++];
- if(expReply instanceof ActorSelection) {
- ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
- assertEquals("Cohort actor path", expReply, actual);
- } else {
- try {
- Await.result(future, Duration.create(5, TimeUnit.SECONDS));
- fail("Expected exception from ready operation Future");
- } catch(Exception e) {
- assertTrue(String.format("Expected exception type %s. Actual %s",
- expReply, e.getClass()), ((Class<?>)expReply).isInstance(e));
+ for(int i = 0; i < expReplies.length; i++) {
+ Object expReply = expReplies[i];
+ boolean found = false;
+ Iterator<?> iter = futureResults.iterator();
+ while(iter.hasNext()) {
+ Object actual = iter.next();
+ if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(expReply) &&
+ CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(actual)) {
+ found = true;
+ } else if(expReply instanceof ActorSelection && Objects.equal(expReply, actual)) {
+ found = true;
+ } else if(expReply instanceof Class && ((Class<?>)expReply).isInstance(actual)) {
+ found = true;
}
+
+ if(found) {
+ iter.remove();
+ break;
+ }
+ }
+
+ if(!found) {
+ fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults));
}
}
}
}};
}
- private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception {
+ private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
+ final boolean writeOnly) throws Exception {
new IntegrationTestKit(getSystem()) {{
- String testName = "testTransactionWritesWithShardNotInitiallyReady";
String shardName = "test-1";
// Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
@Test
public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
- testTransactionWritesWithShardNotInitiallyReady(true);
+ testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
}
@Test
public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
- testTransactionWritesWithShardNotInitiallyReady(false);
+ testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
}
@Test
}
void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+ Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
assertEquals("canCommit", true, canCommit);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
+ cohort1, modification1, true, false), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.class));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the ForwardedReadyTransaction for the next 2 Tx's.
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
+ cohort2, modification2, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true), getRef());
+ cohort3, modification3, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// Verify data in the data store.
- NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
- assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
- outerList.getValue() instanceof Iterable);
- Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
- entry instanceof MapEntryNode);
- MapEntryNode mapEntry = (MapEntryNode)entry;
- Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
- mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
- assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
- assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+ verifyOuterListEntry(shard, 1);
verifyLastApplied(shard, 2);
}
private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
- NormalizedNode<?, ?> data, boolean ready) {
- return newBatchedModifications(transactionID, null, path, data, ready);
+ NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
+ return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
}
private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
- YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
+ YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
batched.addModification(new WriteModification(path, data));
batched.setReady(ready);
+ batched.setDoCommitOnReady(doCommitOnReady);
return batched;
}
- @SuppressWarnings("unchecked")
@Test
- public void testMultipleBatchedModifications() throws Throwable {
+ public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testMultipleBatchedModifications");
+ "testBatchedModificationsWithNoCommitOnReady");
waitUntilLeader(shard);
// Send a BatchedModifications to start a transaction.
shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
// Send a couple more BatchedModifications.
shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
// Verify data in the data store.
- NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
- assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
- outerList.getValue() instanceof Iterable);
- Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
- assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
- entry instanceof MapEntryNode);
- MapEntryNode mapEntry = (MapEntryNode)entry;
- Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
- mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
- assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
- assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+ verifyOuterListEntry(shard, 1);
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
+ @Test
+ public void testBatchedModificationsWithCommitOnReady() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsWithCommitOnReady");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx";
+ FiniteDuration duration = duration("5 seconds");
+
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+ if(mockCohort.get() == null) {
+ mockCohort.set(createDelegatingMockCohort("cohort", actual));
+ }
+
+ return mockCohort.get();
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+ // Send a BatchedModifications to start a transaction.
+
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Send a couple more BatchedModifications.
+
+ shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ InOrder inOrder = inOrder(mockCohort.get());
+ inOrder.verify(mockCohort.get()).canCommit();
+ inOrder.verify(mockCohort.get()).preCommit();
+ inOrder.verify(mockCohort.get()).commit();
+
+ // Verify data in the data store.
+
+ verifyOuterListEntry(shard, 1);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @SuppressWarnings("unchecked")
+ private void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
+ NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+ outerList.getValue() instanceof Iterable);
+ Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+ entry instanceof MapEntryNode);
+ MapEntryNode mapEntry = (MapEntryNode)entry;
+ Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+ mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+ assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+ assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
+ }
+
@Test
public void testBatchedModificationsOnTransactionChain() throws Throwable {
new ShardTestKit(getSystem()) {{
ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
YangInstanceIdentifier path = TestModel.TEST_PATH;
shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
- containerNode, true), getRef());
+ containerNode, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Create a read Tx on the same chain.
}};
}
+ @Test
+ public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testForwardedReadyTransactionWithImmediateCommit");
+
+ waitUntilLeader(shard);
+
+ InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+ String transactionID = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+ TestModel.TEST_PATH, containerNode, modification);
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true, true), getRef());
+
+ expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+ InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+ assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@Test
public void testCommitWithPersistenceDisabled() throws Throwable {
dataStoreContextBuilder.persistent(false);
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
waitUntilLeader(shard);
- // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+ // Setup 2 simulated transactions with mock cohorts. The first one fails in the
// commit phase.
String transactionID1 = "tx1";
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
+ cohort1, modification1, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
+ cohort2, modification2, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
waitUntilLeader(shard);
- String transactionID = "tx1";
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification1 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
+
+ String transactionID2 = "tx2";
+ MutableCompositeModification modification2 = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- // Send the CanCommitTransaction message.
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ // Send the CanCommitTransaction message for the first Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send the CommitTransaction message. This should send back an error
- // for preCommit failure.
+ // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+ // processed after the first Tx completes.
- shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ Future<Object> canCommitFuture = Patterns.ask(shard,
+ new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+
+ // Send the CommitTransaction message for the first Tx. This should send back an error
+ // and trigger the 2nd Tx to proceed.
+
+ shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
- InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
+ // Wait for the 2nd Tx to complete the canCommit phase.
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ canCommitFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable t, final Object resp) {
+ latch.countDown();
+ }
+ }, getSystem().dispatcher());
+
+ assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+ InOrder inOrder = inOrder(cohort1, cohort2);
+ inOrder.verify(cohort1).canCommit();
+ inOrder.verify(cohort1).preCommit();
+ inOrder.verify(cohort2).canCommit();
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
final FiniteDuration duration = duration("5 seconds");
- String transactionID = "tx1";
+ String transactionID1 = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+ CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("getCanCommit", true, reply.getCanCommit());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testCanCommitPhaseFalseResponse() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFalseResponse");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("getCanCommit", false, reply.getCanCommit());
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort, modification, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+ reply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("getCanCommit", true, reply.getCanCommit());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testImmediateCommitWithCanCommitPhaseFailure");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ String transactionID1 = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort, modification, true, true), getRef());
+
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort, modification, true, true), getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testImmediateCommitWithCanCommitPhaseFalseResponse");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ String transactionID = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true, true), getRef());
+
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Send another can commit to ensure the failed one got cleaned up.
+
+ reset(cohort);
+
+ String transactionID2 = "tx2";
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort, modification, true, true), getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
modification, preCommit);
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
+ cohort, modification, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
// Ready the Tx's
shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
+ cohort1, modification1, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
+ cohort2, modification2, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+ // Try to commit the 1st Tx - should fail as it's not the current Tx.
+
+ shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, akka.actor.Status.Failure.class);
+
// Commit the 2nd Tx.
shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
// Ready the Tx's
shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
+ cohort1, modification1, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
+ cohort2, modification2, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true), getRef());
+ cohort3, modification3, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// canCommit 1st Tx.
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
+ cohort1, modification1, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
+ cohort2, modification2, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
}
@Test
- public void testOnReceiveBatchedModificationsReady() throws Exception {
+ public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
- "testOnReceiveBatchedModificationsReady");
+ "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
JavaTestKit watcher = new JavaTestKit(getSystem());
watcher.watch(transaction);
}};
}
+ @Test
+ public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
+
+ JavaTestKit watcher = new JavaTestKit(getSystem());
+ watcher.watch(transaction);
+
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.addModification(new WriteModification(writePath, writeData));
+ batched.setReady(true);
+ batched.setDoCommitOnReady(true);
+ batched.setTotalMessagesSent(1);
+
+ transaction.tell(batched, getRef());
+ expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
+ watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+ }};
+ }
+
@Test(expected=TestException.class)
public void testOnReceiveBatchedModificationsFailure() throws Throwable {
new JavaTestKit(getSystem()) {{
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@Test
public void testWriteAfterAsyncRead() throws Throwable {
- ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
+ ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- expectBatchedModificationsReady(actorRef);
+ expectBatchedModificationsReady(actorRef, true);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), true,
+ verifyBatchedModifications(batchedModifications.get(0), true, true,
new WriteModification(TestModel.TEST_PATH, nodeToWrite));
assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- expectBatchedModificationsReady(actorRef);
+ expectBatchedModificationsReady(actorRef, true);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), true);
+ verifyBatchedModifications(batchedModifications.get(0), true, true);
+ }
+
+ @Test
+ public void testReadyWithMultipleShardWrites() throws Exception {
+ ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+ ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
+
+ expectBatchedModificationsReady(actorRef1);
+ expectBatchedModificationsReady(actorRef2);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
+ transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
+ actorSelection(actorRef2));
}
@Test
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModificationsReady(actorRef);
+ expectBatchedModificationsReady(actorRef, true);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), true,
+ verifyBatchedModifications(batchedModifications.get(0), true, true,
new WriteModification(TestModel.TEST_PATH, nodeToWrite));
verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModificationsReady(actorRef);
+ expectBatchedModificationsReady(actorRef, true);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
verifyBatchedModifications(batchedModifications.get(0), false,
new WriteModification(TestModel.TEST_PATH, nodeToWrite));
- verifyBatchedModifications(batchedModifications.get(1), true);
+ verifyBatchedModifications(batchedModifications.get(1), true, true);
verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures(proxy, TestException.class);
+ verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
}
private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures(proxy, toThrow.getClass());
+ verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
}
@Test
@Test
public void testReadyWithInvalidReplyMessageType() throws Exception {
dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
- //expectBatchedModifications(actorRef, 1);
+ ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)),
- isA(BatchedModifications.class));
+ executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
+
+ expectBatchedModificationsReady(actorRef2);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
+ transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
-
- verifyCohortFutures(proxy, IllegalArgumentException.class);
+ verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
+ IllegalArgumentException.class);
}
@Test
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
doReturn(true).when(mockActorContext).isPathLocal(anyString());
- expectBatchedModificationsReady(actorRef);
+ expectBatchedModificationsReady(actorRef, true);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
}
private static interface TransactionProxyOperation {
verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
- verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
- new DeleteModification(deletePath2));
+ verifyBatchedModifications(batchedModifications.get(2), true, true,
+ new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
}
// by the ShardTransaction.
shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
- cohort1, modification1, true), getRef());
+ cohort1, modification1, true, false), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the ForwardedReadyTransaction for the next 2 Tx's.
shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
- cohort2, modification2, true), getRef());
+ cohort2, modification2, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
- cohort3, modification3, true), getRef());
+ cohort3, modification3, true, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.dispatch.Futures;
+import akka.util.Timeout;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
return argThat(matcher);
}
+ private CanCommitTransaction eqCanCommitTransaction(final String transactionID) {
+ ArgumentMatcher<CanCommitTransaction> matcher = new ArgumentMatcher<CanCommitTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ return ThreePhaseCommitCohortMessages.CanCommitTransaction.class.equals(argument.getClass()) &&
+ CanCommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private CommitTransaction eqCommitTransaction(final String transactionID) {
+ ArgumentMatcher<CommitTransaction> matcher = new ArgumentMatcher<CommitTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ return ThreePhaseCommitCohortMessages.CommitTransaction.class.equals(argument.getClass()) &&
+ CommitTransaction.fromSerializable(argument).getTransactionID().equals(transactionID);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
private Future<Object> readySerializedTxReply(String path, short version) {
return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
}
private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version,
+ DefaultShardStrategy.DEFAULT_SHARD);
NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
transactionProxy.delete(TestModel.TEST_PATH);
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+ AbstractThreePhaseCommitCohort<?> proxy = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)),
+ eqCanCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
- verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)),
+ eqCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
+
+ Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS);
+ assertEquals("canCommit", true, canCommit.booleanValue());
+
+ proxy.preCommit().get(3, TimeUnit.SECONDS);
+
+ proxy.commit().get(3, TimeUnit.SECONDS);
return actorRef;
}
// creating transaction actors for write-only Tx's.
public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
short version = DataStoreVersions.HELIUM_2_VERSION;
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version,
+ DefaultShardStrategy.DEFAULT_SHARD);
NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);