private final TransactionIdentifier transactionIdentifier;
private long modificationCount = 0;
private boolean handOffComplete;
+ private final short transactionVersion;
protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) {
+ this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier,
+ short transactionVersion) {
this.transactionIdentifier = transactionIdentifier;
+ this.transactionVersion = transactionVersion;
}
/**
public boolean usesOperationLimiting() {
return false;
}
+
+ @Override
+ public short getTransactionVersion() {
+ return transactionVersion;
+ }
}
private final ActorContext actorContext;
private final ActorSelection actor;
- private final short remoteTransactionVersion;
private final OperationLimiter limiter;
private BatchedModifications batchedModifications;
protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
- super(identifier);
+ super(identifier, remoteTransactionVersion);
this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
this.actorContext = actorContext;
- this.remoteTransactionVersion = remoteTransactionVersion;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
return actorContext;
}
- protected short getRemoteTransactionVersion() {
- return remoteTransactionVersion;
- }
-
protected Future<Object> executeOperationAsync(SerializableMessage msg) {
return completeOperation(actorContext.executeOperationAsync(getActor(), msg.toSerializable()));
}
LOG.debug("Tx {} closeTransaction called", getIdentifier());
TransactionContextCleanup.untrack(this);
- actorContext.sendOperationAsync(getActor(), new CloseTransaction(remoteTransactionVersion).toSerializable());
+ actorContext.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable());
}
@Override
}
private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, getIdentifier().getChainId());
+ return new BatchedModifications(getIdentifier().toString(), getTransactionVersion(),
+ getIdentifier().getChainId());
}
private void batchModification(Modification modification) {
}
};
- Future<Object> future = executeOperationAsync(readCmd.asVersion(remoteTransactionVersion));
+ Future<Object> future = executeOperationAsync(readCmd.asVersion(getTransactionVersion()));
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
try {
cohortEntry.commit();
- sender.tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).toSerializable(), getSelf());
+ sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
}
- sender.tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).toSerializable(), getSelf());
+ sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
+ getSelf());
} else {
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
ready.getTransactionID(), ready.getTxnClientVersion());
ShardDataTreeCohort cohort = ready.getTransaction().ready();
- CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
+ CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
cohortCache.put(ready.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
dataTree.newReadWriteTransaction(batched.getTransactionID(),
- batched.getTransactionChainID()));
+ batched.getTransactionChainID()), batched.getVersion());
cohortCache.put(batched.getTransactionID(), cohortEntry);
}
void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
- final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+ DataStoreVersions.CURRENT_VERSION);
cohortCache.put(message.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
} else {
// FIXME - use caller's version
cohortEntry.getReplySender().tell(
- canCommit ? CanCommitTransactionReply.yes(DataStoreVersions.CURRENT_VERSION).toSerializable() :
- CanCommitTransactionReply.no(DataStoreVersions.CURRENT_VERSION).toSerializable(), cohortEntry.getShard().self());
+ canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
+ CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
+ cohortEntry.getShard().self());
}
} catch (Exception e) {
log.debug("{}: An exception occurred during canCommit", name, e);
shard.getShardMBean().incrementAbortTransactionsCount();
if(sender != null) {
- sender.tell(new AbortTransactionReply().toSerializable(), self);
+ sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
}
} catch (Exception e) {
log.error("{}: An exception happened during abort", name, e);
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private int totalBatchedModificationsReceived;
private boolean aborted;
+ private final short clientVersion;
- CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
+ CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = transactionID;
+ this.clientVersion = clientVersion;
}
- CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
+ CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
this.transactionID = transactionID;
this.cohort = cohort;
this.transaction = null;
+ this.clientVersion = clientVersion;
}
void updateLastAccessTime() {
return transactionID;
}
+ short getClientVersion() {
+ return clientVersion;
+ }
+
DataTreeCandidate getCandidate() {
return cohort.getCandidate();
}
/**
* A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
* to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
- * shard as an optimization. Otherwise the 3-phase commit to the shard is delegated to a
- * ThreePhaseCommitCohortProxy instance (this is for backwards compatibility with pre-Lithium versions).
+ * shard as an optimization.
*
* @author Thomas Pantelis
*/
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
-import scala.runtime.AbstractFunction1;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
+ private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
+ @Override
+ public Object newMessage(String transactionId, short version) {
+ return new CommitTransaction(transactionId, version).toSerializable();
+ }
+
+ @Override
+ public boolean isSerializedReplyType(Object reply) {
+ return CommitTransactionReply.isSerializedType(reply);
+ }
+ };
+
+ private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
+ @Override
+ public Object newMessage(String transactionId, short version) {
+ return new AbortTransaction(transactionId, version).toSerializable();
+ }
+
+ @Override
+ public boolean isSerializedReplyType(Object reply) {
+ return AbortTransactionReply.isSerializedType(reply);
+ }
+ };
+
private final ActorContext actorContext;
- private final List<Future<ActorSelection>> cohortFutures;
- private volatile List<ActorSelection> cohorts;
+ private final List<CohortInfo> cohorts;
+ private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
private final String transactionId;
private volatile OperationCallback commitOperationCallback;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext,
- List<Future<ActorSelection>> cohortFutures, String transactionId) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts, String transactionId) {
this.actorContext = actorContext;
- this.cohortFutures = cohortFutures;
+ this.cohorts = cohorts;
this.transactionId = transactionId;
- }
- private Future<Void> buildCohortList() {
+ if(cohorts.isEmpty()) {
+ cohortsResolvedFuture.set(null);
+ }
+ }
- Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
- actorContext.getClientDispatcher());
+ private ListenableFuture<Void> resolveCohorts() {
+ if(cohortsResolvedFuture.isDone()) {
+ return cohortsResolvedFuture;
+ }
- return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
- @Override
- public Void apply(Iterable<ActorSelection> actorSelections) {
- cohorts = Lists.newArrayList(actorSelections);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} successfully built cohort path list: {}",
- transactionId, cohorts);
+ final AtomicInteger completed = new AtomicInteger(cohorts.size());
+ for(final CohortInfo info: cohorts) {
+ info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(Throwable failure, ActorSelection actor) {
+ synchronized(completed) {
+ boolean done = completed.decrementAndGet() == 0;
+ if(failure != null) {
+ LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
+ cohortsResolvedFuture.setException(failure);
+ } else if(!cohortsResolvedFuture.isDone()) {
+ LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
+
+ info.setResolvedActor(actor);
+ if(done) {
+ LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
+ cohortsResolvedFuture.set(null);
+ }
+ }
+ }
}
- return null;
- }
- }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+ }, actorContext.getClientDispatcher());
+ }
+
+ return cohortsResolvedFuture;
}
@Override
public ListenableFuture<Boolean> canCommit() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} canCommit", transactionId);
- }
+ LOG.debug("Tx {} canCommit", transactionId);
+
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
// The first phase of canCommit is to gather the list of cohort actor paths that will
// extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
// and passed to us from upstream processing. If any one fails then we'll fail canCommit.
- buildCohortList().onComplete(new OnComplete<Void>() {
+ Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
@Override
- public void onComplete(Throwable failure, Void notUsed) throws Throwable {
- if(failure != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
- }
- returnFuture.setException(failure);
- } else {
- finishCanCommit(returnFuture);
- }
+ public void onSuccess(Void notUsed) {
+ finishCanCommit(returnFuture);
}
- }, actorContext.getClientDispatcher());
+
+ @Override
+ public void onFailure(Throwable failure) {
+ returnFuture.setException(failure);
+ }
+ });
return returnFuture;
}
private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} finishCanCommit", transactionId);
- }
+ LOG.debug("Tx {} finishCanCommit", transactionId);
// For empty transactions return immediately
if(cohorts.size() == 0){
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
- }
+ LOG.debug("Tx {}: canCommit returning result true", transactionId);
returnFuture.set(Boolean.TRUE);
return;
}
- commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
- new TransactionRateLimitingCallback(actorContext);
-
+ commitOperationCallback = new TransactionRateLimitingCallback(actorContext);
commitOperationCallback.run();
- final Object message = new CanCommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable();
-
- final Iterator<ActorSelection> iterator = cohorts.iterator();
+ final Iterator<CohortInfo> iterator = cohorts.iterator();
final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) throws Throwable {
+ public void onComplete(Throwable failure, Object response) {
if (failure != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
- }
+ LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
+
returnFuture.setException(failure);
commitOperationCallback.failure();
return;
boolean result = true;
if (CanCommitTransactionReply.isSerializedType(response)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
+ CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
+
+ LOG.debug("Tx {}: received {}", transactionId, response);
+
if (!reply.getCanCommit()) {
result = false;
}
return;
}
- if(iterator.hasNext() && result){
- Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
- actorContext.getTransactionCommitOperationTimeout());
- future.onComplete(this, actorContext.getClientDispatcher());
+ if(iterator.hasNext() && result) {
+ sendCanCommitTransaction(iterator.next(), this);
} else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
- }
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
returnFuture.set(Boolean.valueOf(result));
}
}
};
- Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
- actorContext.getTransactionCommitOperationTimeout());
+ sendCanCommitTransaction(iterator.next(), onComplete);
+ }
+
+ private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
+ CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
+ }
+
+ Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
+ message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
- private Future<Iterable<Object>> invokeCohorts(Object message) {
+ private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
- for(ActorSelection cohort : cohorts) {
+ for(CohortInfo cohort : cohorts) {
+ Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
+
if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
}
- futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
+
+ futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
+ actorContext.getTransactionCommitOperationTimeout()));
}
- return Futures.sequence(futureList, actorContext.getClientDispatcher());
+ return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher());
}
@Override
// exception then that exception will supersede and suppress the original exception. But
// it's the original exception that is the root cause and of more interest to the client.
- return voidOperation("abort", new AbortTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(),
- AbortTransactionReply.class, false);
+ return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
+ AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
}
@Override
OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
OperationCallback.NO_OP_CALLBACK;
- return voidOperation("commit", new CommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(),
+ return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
CommitTransactionReply.class, true, operationCallback);
}
- private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException) {
- return voidOperation(operationName, message, expectedResponseClass, propagateException,
- OperationCallback.NO_OP_CALLBACK);
+ private boolean successfulFuture(ListenableFuture<Void> future) {
+ if(!future.isDone()) {
+ return false;
+ }
+
+ try {
+ future.get();
+ return true;
+ } catch(Exception e) {
+ return false;
+ }
}
- private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
+ private ListenableFuture<Void> voidOperation(final String operationName,
+ final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
+ final boolean propagateException, final OperationCallback callback) {
+ LOG.debug("Tx {} {}", transactionId, operationName);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} {}", transactionId, operationName);
- }
final SettableFuture<Void> returnFuture = SettableFuture.create();
// The cohort actor list should already be built at this point by the canCommit phase but,
// if not for some reason, we'll try to build it here.
- if(cohorts != null) {
- finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
+ ListenableFuture<Void> future = resolveCohorts();
+ if(successfulFuture(future)) {
+ finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
returnFuture, callback);
} else {
- buildCohortList().onComplete(new OnComplete<Void>() {
+ Futures.addCallback(future, new FutureCallback<Void>() {
@Override
- public void onComplete(Throwable failure, Void notUsed) throws Throwable {
- if(failure != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
- operationName, failure);
- }
- if(propagateException) {
- returnFuture.setException(failure);
- } else {
- returnFuture.set(null);
- }
+ public void onSuccess(Void notUsed) {
+ finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
+ propagateException, returnFuture, callback);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
+
+ if(propagateException) {
+ returnFuture.setException(failure);
} else {
- finishVoidOperation(operationName, message, expectedResponseClass,
- propagateException, returnFuture, callback);
+ returnFuture.set(null);
}
}
- }, actorContext.getClientDispatcher());
+ });
}
return returnFuture;
}
- private void finishVoidOperation(final String operationName, final Object message,
+ private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier,
final Class<?> expectedResponseClass, final boolean propagateException,
final SettableFuture<Void> returnFuture, final OperationCallback callback) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} finish {}", transactionId, operationName);
- }
+ LOG.debug("Tx {} finish {}", transactionId, operationName);
callback.resume();
- Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
-
Throwable exceptionToPropagate = failure;
if(exceptionToPropagate == null) {
for(Object response: responses) {
if(!response.getClass().equals(expectedResponseClass)) {
exceptionToPropagate = new IllegalArgumentException(
- String.format("Unexpected response type %s",
- response.getClass()));
+ String.format("Unexpected response type %s", response.getClass()));
break;
}
}
}
if(exceptionToPropagate != null) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
- operationName, exceptionToPropagate);
- }
+ LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
if(propagateException) {
// We don't log the exception here to avoid redundant logging since we're
// propagating to the caller in MD-SAL core who will log it.
// Since the caller doesn't want us to propagate the exception we'll also
// not log it normally. But it's usually not good to totally silence
// exceptions so we'll log it to debug level.
- if(LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
- exceptionToPropagate);
- }
returnFuture.set(null);
}
callback.failure();
} else {
+ LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
- }
returnFuture.set(null);
callback.success();
@Override
List<Future<ActorSelection>> getCohortFutures() {
- return Collections.unmodifiableList(cohortFutures);
+ List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
+ for(CohortInfo info: cohorts) {
+ cohortFutures.add(info.getActorFuture());
+ }
+
+ return cohortFutures;
+ }
+
+ static class CohortInfo {
+ private final Future<ActorSelection> actorFuture;
+ private volatile ActorSelection resolvedActor;
+ private final Supplier<Short> actorVersionSupplier;
+
+ CohortInfo(Future<ActorSelection> actorFuture, Supplier<Short> actorVersionSupplier) {
+ this.actorFuture = actorFuture;
+ this.actorVersionSupplier = actorVersionSupplier;
+ }
+
+ Future<ActorSelection> getActorFuture() {
+ return actorFuture;
+ }
+
+ ActorSelection getResolvedActor() {
+ return resolvedActor;
+ }
+
+ void setResolvedActor(ActorSelection resolvedActor) {
+ this.resolvedActor = resolvedActor;
+ }
+
+ short getActorVersion() {
+ Preconditions.checkState(resolvedActor != null,
+ "getActorVersion cannot be called until the actor is resolved");
+ return actorVersionSupplier.get();
+ }
+ }
+
+ private interface MessageSupplier {
+ Object newMessage(String transactionId, short version);
+ boolean isSerializedReplyType(Object reply);
}
}
* @return
*/
boolean usesOperationLimiting();
+
+ short getTransactionVersion();
}
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
final TransactionContextWrapper contextWrapper) {
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
final Set<Entry<String, TransactionContextWrapper>> txContextWrapperEntries) {
- final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextWrapperEntries.size());
+ final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrapperEntries.size());
for (Entry<String, TransactionContextWrapper> e : txContextWrapperEntries) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
- cohortFutures.add(e.getValue().readyTransaction());
+ final TransactionContextWrapper wrapper = e.getValue();
+
+ // The remote tx version is obtained the via TransactionContext which may not be available yet so
+ // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
+ // TransactionContext is available.
+ Supplier<Short> txVersionSupplier = new Supplier<Short>() {
+ @Override
+ public Short get() {
+ return wrapper.getTransactionContext().getTransactionVersion();
+ }
+ };
+
+ cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier));
}
- return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
+ return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts,
+ getIdentifier().toString());
}
private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
import akka.testkit.TestActorRef;
import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
}
}
+ protected ShardDataTreeCohort mockShardDataTreeCohort() {
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class);
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+ doReturn(mockCandidate("candidate")).when(cohort).getCandidate();
+ return cohort;
+ }
+
static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) {
ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
}
- static DataTreeCandidateTip mockCandidate(final String name) {
+ public static DataTreeCandidateTip mockCandidate(final String name) {
final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReadyLocalTransactionForwardedToLeader() throws Exception {
initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
((ReadyTransactionReply)resp).getCohortPath());
+ Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
+ Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
- leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+ leaderDistributedDataStore.getActorContext(), Arrays.asList(
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
}
+ @SuppressWarnings("unchecked")
@Test
public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
((ReadyTransactionReply)resp).getCohortPath());
+ Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
+ Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
- leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+ leaderDistributedDataStore.getActorContext(), Arrays.asList(
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.Dispatchers;
import akka.dispatch.Futures;
-import akka.util.Timeout;
+import akka.testkit.TestActorRef;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
-import com.google.common.collect.Lists;
+import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.mockito.stubbing.Stubber;
+import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
static class TestException extends RuntimeException {
}
- @Mock
private ActorContext actorContext;
- @Mock
- private DatastoreContext datastoreContext;
-
@Mock
private Timer commitTimer;
@Mock
private Snapshot commitSnapshot;
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+ private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- doReturn(getSystem()).when(actorContext).getActorSystem();
- doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
- doReturn(datastoreContext).when(actorContext).getDatastoreContext();
- doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
- doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
+ new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) {
+ @Override
+ public Timer getOperationTimer(String operationName) {
+ return commitTimer;
+ }
+
+ @Override
+ public double getTxCreationLimit() {
+ return 10.0;
+ }
+ };
+
doReturn(commitTimerContext).when(commitTimer).time();
doReturn(commitSnapshot).when(commitTimer).getSnapshot();
for(int i=1;i<11;i++){
// percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
}
- doReturn(10.0).when(actorContext).getTxCreationLimit();
- }
-
- private Future<ActorSelection> newCohort() {
- ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
- ActorSelection actorSelection = getSystem().actorSelection(path);
- return Futures.successful(actorSelection);
- }
-
- private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
- List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
- for(int i = 1; i <= nCohorts; i++) {
- cohortFutures.add(newCohort());
- }
-
- return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
}
- private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
- throws Exception {
- List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
- cohortFutures.add(newCohort());
- cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
-
- return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
- }
-
- private void setupMockActorContext(Class<?> requestType, Object... responses) {
- Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
- .failed((Throwable) responses[0]) : Futures
- .successful(((SerializableMessage) responses[0]).toSerializable()));
-
- for(int i = 1; i < responses.length; i++) {
- stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
- .failed((Throwable) responses[i]) : Futures
- .successful(((SerializableMessage) responses[i]).toSerializable()));
- }
-
- stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
- isA(requestType), any(Timeout.class));
-
- doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
- .when(actorContext).getTransactionCommitOperationTimeout();
- }
-
- private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
- verify(actorContext, times(nCohorts)).executeOperationAsync(
- any(ActorSelection.class), isA(requestType), any(Timeout.class));
- }
-
- private static void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+ @Test
+ public void testCanCommitYesWithOneCohort() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
- try {
- future.get(5, TimeUnit.SECONDS);
- fail("Expected ExecutionException");
- } catch(ExecutionException e) {
- throw e.getCause();
- }
+ verifyCanCommit(proxy.canCommit(), true);
+ verifyCohortActors();
}
@Test
- public void testCanCommitWithOneCohort() throws Exception {
-
- ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
- setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.yes(CURRENT_VERSION));
-
- ListenableFuture<Boolean> future = proxy.canCommit();
-
- assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
+ public void testCanCommitNoWithOneCohort() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
- setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.no(CURRENT_VERSION));
-
- future = proxy.canCommit();
-
- assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
-
- verifyCohortInvocations(2, CanCommitTransaction.class);
+ verifyCanCommit(proxy.canCommit(), false);
+ verifyCohortActors();
}
@Test
- public void testCanCommitWithMultipleCohorts() throws Exception {
-
- ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
- setupMockActorContext(CanCommitTransaction.class,
- CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
-
- ListenableFuture<Boolean> future = proxy.canCommit();
-
- assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
-
- verifyCohortInvocations(2, CanCommitTransaction.class);
+ public void testCanCommitYesWithTwoCohorts() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION))));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+ verifyCanCommit(proxy.canCommit(), true);
+ verifyCohortActors();
}
@Test
- public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
-
- ThreePhaseCommitCohortProxy proxy = setupProxy(3);
-
- setupMockActorContext(CanCommitTransaction.class,
- CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.no(CURRENT_VERSION),
- CanCommitTransactionReply.yes(CURRENT_VERSION));
-
- ListenableFuture<Boolean> future = proxy.canCommit();
-
- Boolean actual = future.get(5, TimeUnit.SECONDS);
-
- assertEquals("canCommit", false, actual);
-
- verifyCohortInvocations(2, CanCommitTransaction.class);
+ public void testCanCommitNoWithThreeCohorts() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ CanCommitTransactionReply.no(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder("txn-1")));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+ verifyCanCommit(proxy.canCommit(), false);
+ verifyCohortActors();
}
@Test(expected = TestException.class)
public void testCanCommitWithExceptionFailure() throws Throwable {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
- ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+ propagateExecutionExceptionCause(proxy.canCommit());
+ }
- setupMockActorContext(CanCommitTransaction.class, new TestException());
+ @Test(expected = IllegalArgumentException.class)
+ public void testCanCommitWithInvalidResponseType() throws Throwable {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
propagateExecutionExceptionCause(proxy.canCommit());
}
- @Test(expected = ExecutionException.class)
- public void testCanCommitWithInvalidResponseType() throws Exception {
-
- ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+ @Test(expected = TestException.class)
+ public void testCanCommitWithFailedCohortFuture() throws Throwable {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1")),
+ newCohortInfoWithFailedFuture(new TestException()),
+ newCohortInfo(new CohortActor.Builder("txn-1")));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
- setupMockActorContext(CanCommitTransaction.class,
- new CommitTransactionReply());
+ propagateExecutionExceptionCause(proxy.canCommit());
+ }
- proxy.canCommit().get(5, TimeUnit.SECONDS);
+ @Test
+ public void testAllThreePhasesSuccessful() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").
+ expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+ expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder("txn-1").
+ expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+ expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ verifySuccessfulFuture(proxy.commit());
+ verifyCohortActors();
}
@Test(expected = TestException.class)
- public void testCanCommitWithFailedCohortPath() throws Throwable {
-
- ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
-
- try {
- propagateExecutionExceptionCause(proxy.canCommit());
- } finally {
- verifyCohortInvocations(0, CanCommitTransaction.class);
- }
+ public void testCommitWithExceptionFailure() throws Throwable {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").
+ expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+ expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder("txn-1").
+ expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+ expectCommit(new TestException())));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ propagateExecutionExceptionCause(proxy.commit());
}
- @Test
- public void testPreCommit() throws Exception {
- // Precommit is currently a no-op
- ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+ @Test(expected = IllegalArgumentException.class)
+ public void testCommitWithInvalidResponseType() throws Throwable {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").
+ expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
+ expectCommit("invalid"))), "txn-1");
- proxy.preCommit().get(5, TimeUnit.SECONDS);
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ propagateExecutionExceptionCause(proxy.commit());
}
@Test
public void testAbort() throws Exception {
- ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
- setupMockActorContext(AbortTransaction.class, new AbortTransactionReply());
-
- proxy.abort().get(5, TimeUnit.SECONDS);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
+ AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
- verifyCohortInvocations(1, AbortTransaction.class);
+ verifySuccessfulFuture(proxy.abort());
+ verifyCohortActors();
}
@Test
public void testAbortWithFailure() throws Exception {
- ThreePhaseCommitCohortProxy proxy = setupProxy(1);
-
- setupMockActorContext(AbortTransaction.class, new RuntimeException("mock"));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
// The exception should not get propagated.
- proxy.abort().get(5, TimeUnit.SECONDS);
-
- verifyCohortInvocations(1, AbortTransaction.class);
+ verifySuccessfulFuture(proxy.abort());
+ verifyCohortActors();
}
@Test
- public void testAbortWithFailedCohortPath() throws Throwable {
-
- ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
-
- // The exception should not get propagated.
- proxy.abort().get(5, TimeUnit.SECONDS);
-
- verifyCohortInvocations(0, AbortTransaction.class);
+ public void testAbortWithFailedCohortFuture() throws Throwable {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfoWithFailedFuture(new TestException()),
+ newCohortInfo(new CohortActor.Builder("txn-1")));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+ verifySuccessfulFuture(proxy.abort());
+ verifyCohortActors();
}
@Test
- public void testCommit() throws Exception {
-
- ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
- setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
- new CommitTransactionReply());
-
- proxy.commit().get(5, TimeUnit.SECONDS);
-
- verifyCohortInvocations(2, CommitTransaction.class);
+ public void testWithNoCohorts() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
+ Collections.<CohortInfo>emptyList(), "txn-1");
+
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ verifySuccessfulFuture(proxy.commit());
+ verifyCohortActors();
}
- @Test(expected = TestException.class)
- public void testCommitWithFailure() throws Throwable {
-
- ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
- setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
- new TestException());
-
- propagateExecutionExceptionCause(proxy.commit());
+ @Test
+ public void testBackwardsCompatibilityWithPreBoron() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder("txn-1").
+ expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class,
+ CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)).
+ expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class,
+ CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)),
+ DataStoreVersions.LITHIUM_VERSION));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ verifySuccessfulFuture(proxy.commit());
+ verifyCohortActors();
}
- @Test(expected = ExecutionException.class)
- public void testCommitWithInvalidResponseType() throws Exception {
+ private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
- ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected ExecutionException");
+ } catch(ExecutionException e) {
+ verifyCohortActors();
+ throw e.getCause();
+ }
+ }
- setupMockActorContext(CommitTransaction.class, new AbortTransactionReply());
+ private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
+ TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
+ cohortActors.add(actor);
+ return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier<Short>() {
+ @Override
+ public Short get() {
+ return version;
+ }
+ });
+ }
- proxy.commit().get(5, TimeUnit.SECONDS);
+ private CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
+ return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
+ @Override
+ public Short get() {
+ return CURRENT_VERSION;
+ }
+ });
}
- @Test(expected = TestException.class)
- public void testCommitWithFailedCohortPath() throws Throwable {
+ private CohortInfo newCohortInfo(CohortActor.Builder builder) {
+ return newCohortInfo(builder, CURRENT_VERSION);
+ }
- ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+ private void verifyCohortActors() {
+ for(TestActorRef<CohortActor> actor: cohortActors) {
+ actor.underlyingActor().verify();
+ }
+ }
+ private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
try {
- propagateExecutionExceptionCause(proxy.commit());
- } finally {
-
- verifyCohortInvocations(0, CommitTransaction.class);
+ return future.get(5, TimeUnit.SECONDS);
+ } catch(Exception e) {
+ verifyCohortActors();
+ throw e;
}
-
}
- @Test
- public void testAllThreePhasesSuccessful() throws Exception {
-
- ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
- setupMockActorContext(CanCommitTransaction.class,
- CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
-
- setupMockActorContext(CommitTransaction.class,
- new CommitTransactionReply(), new CommitTransactionReply());
+ private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
+ Boolean actual = verifySuccessfulFuture(future);
+ assertEquals("canCommit", expected, actual);
+ }
- assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+ private static class CohortActor extends UntypedActor {
+ private final Builder builder;
+ private final AtomicInteger canCommitCount = new AtomicInteger();
+ private final AtomicInteger commitCount = new AtomicInteger();
+ private final AtomicInteger abortCount = new AtomicInteger();
+ private volatile AssertionError assertionError;
- proxy.canCommit().get(5, TimeUnit.SECONDS);
- proxy.preCommit().get(5, TimeUnit.SECONDS);
- proxy.commit().get(5, TimeUnit.SECONDS);
+ private CohortActor(Builder builder) {
+ this.builder = builder;
+ }
- verifyCohortInvocations(2, CanCommitTransaction.class);
- verifyCohortInvocations(2, CommitTransaction.class);
+ @Override
+ public void onReceive(Object message) {
+ if(CanCommitTransaction.isSerializedType(message)) {
+ onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
+ builder.expCanCommitType, builder.canCommitReply);
+ canCommitCount.incrementAndGet();
+ } else if(CommitTransaction.isSerializedType(message)) {
+ onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
+ builder.expCommitType, builder.commitReply);
+ commitCount.incrementAndGet();
+ } else if(AbortTransaction.isSerializedType(message)) {
+ onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
+ builder.expAbortType, builder.abortReply);
+ abortCount.incrementAndGet();
+ } else {
+ assertionError = new AssertionError("Unexpected message " + message);
+ }
+ }
- }
+ private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
+ Class<?> expType, Object reply) {
+ try {
+ assertNotNull("Unexpected " + name, expType);
+ assertEquals(name + " type", expType, rawMessage.getClass());
+ assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID());
+
+ if(reply instanceof Throwable) {
+ getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
+ } else {
+ getSender().tell(reply, self());
+ }
+ } catch(AssertionError e) {
+ assertionError = e;
+ }
+ }
- @Test
- public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+ void verify() {
+ if(assertionError != null) {
+ throw assertionError;
+ }
- ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+ if(builder.expCanCommitType != null) {
+ assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
+ }
- assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+ if(builder.expCommitType != null) {
+ assertEquals("CommitTransaction count", 1, commitCount.get());
+ }
- proxy.canCommit().get(5, TimeUnit.SECONDS);
- proxy.preCommit().get(5, TimeUnit.SECONDS);
- proxy.commit().get(5, TimeUnit.SECONDS);
+ if(builder.expAbortType != null) {
+ assertEquals("AbortTransaction count", 1, abortCount.get());
+ }
+ }
+ static class Builder {
+ private Class<?> expCanCommitType;
+ private Class<?> expCommitType;
+ private Class<?> expAbortType;
+ private Object canCommitReply;
+ private Object commitReply;
+ private Object abortReply;
+ private final String transactionId;
+
+ Builder(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
+ this.expCanCommitType = expCanCommitType;
+ this.canCommitReply = canCommitReply;
+ return this;
+ }
+
+ Builder expectCanCommit(Object canCommitReply) {
+ return expectCanCommit(CanCommitTransaction.class, canCommitReply);
+ }
+
+ Builder expectCommit(Class<?> expCommitType, Object commitReply) {
+ this.expCommitType = expCommitType;
+ this.commitReply = commitReply;
+ return this;
+ }
+
+ Builder expectCommit(Object commitReply) {
+ return expectCommit(CommitTransaction.class, commitReply);
+ }
+
+ Builder expectAbort(Class<?> expAbortType, Object abortReply) {
+ this.expAbortType = expAbortType;
+ this.abortReply = abortReply;
+ return this;
+ }
+
+ Builder expectAbort(Object abortReply) {
+ return expectAbort(AbortTransaction.class, abortReply);
+ }
+
+ Props props() {
+ return Props.create(CohortActor.class, this);
+ }
+ }
}
}
*/
package org.opendaylight.controller.cluster.datastore.compat;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
import org.opendaylight.controller.cluster.datastore.TransactionType;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
/**
* Shard unit tests for backwards compatibility with pre-Boron versions.
path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
}};
}
+
+ @Test
+ public void testBatchedModificationsWithCommitOnReady() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsWithCommitOnReady");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx";
+
+ BatchedModifications batched = new BatchedModifications(transactionID,
+ DataStoreVersions.LITHIUM_VERSION, "");
+ batched.addModification(new WriteModification(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
+ batched.setReady(true);
+ batched.setDoCommitOnReady(true);
+ batched.setTotalMessagesSent(1);
+
+ shard.tell(batched, getRef());
+ expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testImmediateCommitWithForwardedReadyTransaction() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testImmediateCommitWithForwardedReadyTransaction");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx";
+
+ shard.tell(prepareForwardedReadyTransaction(mockShardDataTreeCohort(), transactionID,
+ DataStoreVersions.LITHIUM_VERSION, true), getRef());
+
+ expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testThreePhaseCommitOnForwardedReadyTransaction() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testThreePhaseCommitOnForwardedReadyTransaction");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx";
+
+ shard.tell(prepareForwardedReadyTransaction(mockShardDataTreeCohort(), transactionID,
+ DataStoreVersions.LITHIUM_VERSION, false), getRef());
+ expectMsgClass(ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID, DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef());
+ expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ }};
+ }
}