import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
import java.util.Collections;
import java.util.List;
*/
public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
- private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
private final ActorContext actorContext;
- private final List<ActorPath> cohortPaths;
+ private final List<Future<ActorPath>> cohortPathFutures;
+ private volatile List<ActorPath> cohortPaths;
private final String transactionId;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
- String transactionId) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+ List<Future<ActorPath>> cohortPathFutures, String transactionId) {
this.actorContext = actorContext;
- this.cohortPaths = cohortPaths;
+ this.cohortPathFutures = cohortPathFutures;
this.transactionId = transactionId;
}
+ private Future<Void> buildCohortPathsList() {
+
+ Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
+ actorContext.getActorSystem().dispatcher());
+
+ return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
+ @Override
+ public Void apply(Iterable<ActorPath> paths) {
+ cohortPaths = Lists.newArrayList(paths);
+
+ LOG.debug("Tx {} successfully built cohort path list: {}",
+ transactionId, cohortPaths);
+ return null;
+ }
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+ }
+
@Override
public ListenableFuture<Boolean> canCommit() {
- LOG.debug("txn {} canCommit", transactionId);
+ LOG.debug("Tx {} canCommit", transactionId);
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ // The first phase of canCommit is to gather the list of cohort actor paths that will
+ // participate in the commit. buildCohortPathsList combines the cohort path Futures into
+ // one Future which we wait on asynchronously here. The cohort actor paths are
+ // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
+ // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
+
+ buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ returnFuture.setException(failure);
+ } else {
+ finishCanCommit(returnFuture);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+
+ return returnFuture;
+ }
+
+ private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
+
+ LOG.debug("Tx {} finishCanCommit", transactionId);
+
+ // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
+ // their canCommit processing. If any one fails then we'll fail canCommit.
Future<Iterable<Object>> combinedFuture =
invokeCohorts(new CanCommitTransaction().toSerializable());
- final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
returnFuture.setException(failure);
return;
}
}
}
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
returnFuture.set(Boolean.valueOf(result));
}
}, actorContext.getActorSystem().dispatcher());
-
- return returnFuture;
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
for(ActorPath actorPath : cohortPaths) {
- LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
ActorSelection cohort = actorContext.actorSelection(actorPath);
@Override
public ListenableFuture<Void> preCommit() {
- LOG.debug("txn {} preCommit", transactionId);
- return voidOperation(new PreCommitTransaction().toSerializable(),
+ return voidOperation("preCommit", new PreCommitTransaction().toSerializable(),
PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
}
@Override
public ListenableFuture<Void> abort() {
- LOG.debug("txn {} abort", transactionId);
-
// Note - we pass false for propagateException. In the front-end data broker, this method
// is called when one of the 3 phases fails with an exception. We'd rather have that
// original exception propagated to the client. If our abort fails and we propagate the
// exception then that exception will supersede and suppress the original exception. But
// it's the original exception that is the root cause and of more interest to the client.
- return voidOperation(new AbortTransaction().toSerializable(),
+ return voidOperation("abort", new AbortTransaction().toSerializable(),
AbortTransactionReply.SERIALIZABLE_CLASS, false);
}
@Override
public ListenableFuture<Void> commit() {
- LOG.debug("txn {} commit", transactionId);
- return voidOperation(new CommitTransaction().toSerializable(),
+ return voidOperation("commit", new CommitTransaction().toSerializable(),
CommitTransactionReply.SERIALIZABLE_CLASS, true);
}
- private ListenableFuture<Void> voidOperation(final Object message,
+ private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException) {
- Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+ LOG.debug("Tx {} {}", transactionId, operationName);
final SettableFuture<Void> returnFuture = SettableFuture.create();
+ // The cohort actor list should already be built at this point by the canCommit phase but,
+ // if not for some reason, we'll try to build it here.
+
+ if(cohortPaths != null) {
+ finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
+ returnFuture);
+ } else {
+ buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+ operationName, failure);
+
+ if(propagateException) {
+ returnFuture.setException(failure);
+ } else {
+ returnFuture.set(null);
+ }
+ } else {
+ finishVoidOperation(operationName, message, expectedResponseClass,
+ propagateException, returnFuture);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+
+ return returnFuture;
+ }
+
+ private void finishVoidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture) {
+
+ LOG.debug("Tx {} finish {}", transactionId, operationName);
+
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
}
if(exceptionToPropagate != null) {
+ LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
+ operationName, exceptionToPropagate);
+
if(propagateException) {
// We don't log the exception here to avoid redundant logging since we're
// propagating to the caller in MD-SAL core who will log it.
returnFuture.set(null);
}
} else {
+ LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
returnFuture.set(null);
}
}
}, actorContext.getActorSystem().dispatcher());
-
- return returnFuture;
}
- public List<ActorPath> getCohortPaths() {
- return Collections.unmodifiableList(this.cohortPaths);
+ @VisibleForTesting
+ List<Future<ActorPath>> getCohortPathFutures() {
+ return Collections.unmodifiableList(cohortPathFutures);
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorPath;
-import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.Props;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Function1;
import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
READ_WRITE
}
+ static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
+ Throwable, Throwable>() {
+ @Override
+ public Throwable apply(Throwable failure) {
+ return failure;
+ }
+ };
+
private static final AtomicLong counter = new AtomicLong();
private static final Logger
}
+ @VisibleForTesting
+ List<Future<Object>> getRecordedOperationFutures() {
+ List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+ for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+ recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ }
+
+ return recordedOperationFutures;
+ }
+
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Read operation on write-only transaction is not allowed");
- LOG.debug("txn {} read {}", identifier, path);
+ LOG.debug("Tx {} read {}", identifier, path);
createTransactionIfMissing(actorContext, path);
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Exists operation on write-only transaction is not allowed");
- LOG.debug("txn {} exists {}", identifier, path);
+ LOG.debug("Tx {} exists {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} write {}", identifier, path);
+ LOG.debug("Tx {} write {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} merge {}", identifier, path);
+ LOG.debug("Tx {} merge {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} delete {}", identifier, path);
+ LOG.debug("Tx {} delete {}", identifier, path);
createTransactionIfMissing(actorContext, path);
inReadyState = true;
- List<ActorPath> cohortPaths = new ArrayList<>();
-
- LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+ LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- LOG.debug("txn {} Readying transaction for shard {}", identifier,
+ LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
- Object result = transactionContext.readyTransaction();
-
- if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
- actorContext.getActorSystem(),result);
- String resolvedCohortPath = transactionContext.getResolvedCohortPath(
- reply.getCohortPath().toString());
- cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
- } else {
- LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
- result.getClass());
- }
+ cohortPathFutures.add(transactionContext.readyTransaction());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+ identifier.toString());
}
@Override
String transactionPath = reply.getTransactionPath();
- LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
+ LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
ActorSelection transactionActor =
actorContext.actorSelection(transactionPath);
remoteTransactionPaths.put(shardName, transactionContext);
} else {
- LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
- response.getClass());
+ throw new IllegalArgumentException(String.format(
+ "Invalid reply type {} for CreateTransaction", response.getClass()));
}
} catch(Exception e){
- LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
}
}
private interface TransactionContext {
String getShardName();
- String getResolvedCohortPath(String cohortPath);
+ void closeTransaction();
- public void closeTransaction();
+ Future<ActorPath> readyTransaction();
- public Object readyTransaction();
+ void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
void deleteData(YangInstanceIdentifier path);
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
final YangInstanceIdentifier path);
- void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
-
CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
- }
+ List<Future<Object>> getRecordedOperationFutures();
+ }
- private class TransactionContextImpl implements TransactionContext {
- private final String shardName;
- private final String actorPath;
- private final ActorSelection actor;
+ private abstract class AbstractTransactionContext implements TransactionContext {
+ protected final String shardName;
+ protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
- private TransactionContextImpl(String shardName, String actorPath,
- ActorSelection actor) {
+ AbstractTransactionContext(String shardName) {
this.shardName = shardName;
- this.actorPath = actorPath;
- this.actor = actor;
}
@Override
return shardName;
}
+ @Override
+ public List<Future<Object>> getRecordedOperationFutures() {
+ return recordedOperationFutures;
+ }
+ }
+
+ private class TransactionContextImpl extends AbstractTransactionContext {
+ private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+
+ private final String actorPath;
+ private final ActorSelection actor;
+
+ private TransactionContextImpl(String shardName, String actorPath,
+ ActorSelection actor) {
+ super(shardName);
+ this.actorPath = actorPath;
+ this.actor = actor;
+ }
+
private ActorSelection getActor() {
return actor;
}
- @Override
- public String getResolvedCohortPath(String cohortPath) {
+ private String getResolvedCohortPath(String cohortPath) {
return actorContext.resolvePath(actorPath, cohortPath);
}
@Override
public void closeTransaction() {
+ LOG.debug("Tx {} closeTransaction called", identifier);
actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
@Override
- public Object readyTransaction() {
- return actorContext.executeRemoteOperation(getActor(),
+ public Future<ActorPath> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the ReadyTransaction message to the Tx actor.
+
+ final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+
+ // Combine all the previously recorded put/merge/delete operation reply Futures and the
+ // ReadyTransactionReply Future into one Future. If any one fails then the combined
+ // Future will fail. We need all prior operations and the ready operation to succeed
+ // in order to attempt commit.
+
+ List<Future<Object>> futureList =
+ Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+ futureList.addAll(recordedOperationFutures);
+ futureList.add(replyFuture);
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
+ actorContext.getActorSystem().dispatcher());
+
+ // Transform the combined Future into a Future that returns the cohort actor path from
+ // the ReadyTransactionReply. That's the end result of the ready operation.
+
+ return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+ @Override
+ public ActorPath apply(Iterable<Object> notUsed) {
+
+ LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+ identifier);
+
+ // At this point all the Futures succeeded and we need to extract the cohort
+ // actor path from the ReadyTransactionReply. For the recorded operations, they
+ // don't return any data so we're only interested that they completed
+ // successfully. We could be paranoid and verify the correct reply types but
+ // that really should never happen so it's not worth the overhead of
+ // de-serializing each reply.
+
+ // Note the Future get call here won't block as it's complete.
+ Object serializedReadyReply = replyFuture.value().get().get();
+ if(serializedReadyReply.getClass().equals(
+ ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+ ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+ actorContext.getActorSystem(), serializedReadyReply);
+
+ String resolvedCohortPath = getResolvedCohortPath(
+ reply.getCohortPath().toString());
+
+ LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+ identifier, resolvedCohortPath);
+
+ return actorContext.actorFor(resolvedCohortPath);
+ } else {
+ // Throwing an exception here will fail the Future.
+
+ throw new IllegalArgumentException(String.format("Invalid reply type {}",
+ serializedReadyReply.getClass()));
+ }
+ }
+ }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
}
@Override
public void deleteData(YangInstanceIdentifier path) {
- actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- actorContext.sendRemoteOperationAsync(getActor(),
- new MergeData(path, data, schemaContext).toSerializable());
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new MergeData(path, data, schemaContext).toSerializable(),
+ ActorContext.ASK_DURATION));
+ }
+
+ @Override
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new WriteData(path, data, schemaContext).toSerializable(),
+ ActorContext.ASK_DURATION));
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
- final YangInstanceIdentifier path) {
+ final YangInstanceIdentifier path) {
+
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+ // If there were any previous recorded put/merge/delete operation reply Futures then we
+ // must wait for them to successfully complete. This is necessary to honor the read
+ // uncommitted semantics of the public API contract. If any one fails then fail the read.
+
+ if(recordedOperationFutures.isEmpty()) {
+ finishReadData(path, returnFuture);
+ } else {
+ LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+ identifier, recordedOperationFutures.size());
+
+ // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+ // Futures#sequence accesses the passed List on a different thread, as
+ // recordedOperationFutures is not synchronized.
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+ Lists.newArrayList(recordedOperationFutures),
+ actorContext.getActorSystem().dispatcher());
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> notUsed)
+ throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} readData: a recorded operation failed: {}",
+ identifier, failure);
+
+ returnFuture.setException(new ReadFailedException(
+ "The read could not be performed because a previous put, merge,"
+ + "or delete operation failed", failure));
+ } else {
+ finishReadData(path, returnFuture);
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+ }
+
+ private void finishReadData(final YangInstanceIdentifier path,
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+
+ LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) throws Throwable {
+ public void onComplete(Throwable failure, Object readResponse) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+
returnFuture.setException(new ReadFailedException(
"Error reading data for path " + path, failure));
} else {
- if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ LOG.debug("Tx {} read operation succeeded", identifier, failure);
+
+ if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
- path, response);
+ path, readResponse);
if (reply.getNormalizedNode() == null) {
returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
} else {
}
};
- Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
- future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
- return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
- }
-
- @Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- actorContext.sendRemoteOperationAsync(getActor(),
- new WriteData(path, data, schemaContext).toSerializable());
+ readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
@Override
public CheckedFuture<Boolean, ReadFailedException> dataExists(
final YangInstanceIdentifier path) {
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+ // If there were any previous recorded put/merge/delete operation reply Futures then we
+ // must wait for them to successfully complete. This is necessary to honor the read
+ // uncommitted semantics of the public API contract. If any one fails then fail this
+ // request.
+
+ if(recordedOperationFutures.isEmpty()) {
+ finishDataExists(path, returnFuture);
+ } else {
+ LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+ identifier, recordedOperationFutures.size());
+
+ // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+ // Futures#sequence accesses the passed List on a different thread, as
+ // recordedOperationFutures is not synchronized.
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+ Lists.newArrayList(recordedOperationFutures),
+ actorContext.getActorSystem().dispatcher());
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> notUsed)
+ throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+ identifier, failure);
+
+ returnFuture.setException(new ReadFailedException(
+ "The data exists could not be performed because a previous "
+ + "put, merge, or delete operation failed", failure));
+ } else {
+ finishDataExists(path, returnFuture);
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+ }
+
+ private void finishDataExists(final YangInstanceIdentifier path,
+ final SettableFuture<Boolean> returnFuture) {
+
+ LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+
returnFuture.setException(new ReadFailedException(
- "Error checking exists for path " + path, failure));
+ "Error checking data exists for path " + path, failure));
} else {
+ LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+
if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
returnFuture.set(Boolean.valueOf(DataExistsReply.
fromSerializable(response).exists()));
Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
- return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
}
- private class NoOpTransactionContext implements TransactionContext {
+ private class NoOpTransactionContext extends AbstractTransactionContext {
- private final Logger
- LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+ private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
- private final String shardName;
private final Exception failure;
- private ActorRef cohort;
-
public NoOpTransactionContext(String shardName, Exception failure){
- this.shardName = shardName;
+ super(shardName);
this.failure = failure;
}
@Override
- public String getShardName() {
- return shardName;
-
+ public void closeTransaction() {
+ LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
}
@Override
- public String getResolvedCohortPath(String cohortPath) {
- return cohort.path().toString();
+ public Future<ActorPath> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called", identifier);
+ return akka.dispatch.Futures.failed(failure);
}
@Override
- public void closeTransaction() {
- LOG.warn("txn {} closeTransaction called", identifier);
- }
-
- @Override public Object readyTransaction() {
- LOG.warn("txn {} readyTransaction called", identifier);
- cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
- return new ReadyTransactionReply(cohort.path()).toSerializable();
+ public void deleteData(YangInstanceIdentifier path) {
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
}
@Override
- public void deleteData(YangInstanceIdentifier path) {
- LOG.warn("txt {} deleteData called path = {}", identifier, path);
+ public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
}
@Override
- public void mergeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- LOG.warn("txn {} mergeData called path = {}", identifier, path);
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.warn("txn {} readData called path = {}", identifier, path);
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error reading data for path " + path, failure));
}
- @Override public void writeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- LOG.warn("txn {} writeData called path = {}", identifier, path);
- }
-
- @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
- LOG.warn("txn {} dataExists called path = {}", identifier, path);
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error checking exists for path " + path, failure));
}
}
-
-
-
}
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
+
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.isA;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+
+import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
+ @SuppressWarnings("serial")
+ static class TestException extends RuntimeException {
+ }
+
@Mock
private ActorContext actorContext;
doReturn(getSystem()).when(actorContext).getActorSystem();
}
- private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
- List<ActorPath> cohorts = Lists.newArrayList();
+ private Future<ActorPath> newCohortPath() {
+ ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
+ doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ return Futures.successful(path);
+ }
+
+ private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
for(int i = 1; i <= nCohorts; i++) {
- ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
- cohorts.add(path);
- doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ cohortPathFutures.add(newCohortPath());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+ }
+
+ private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+ throws Exception {
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+ cohortPathFutures.add(newCohortPath());
+ cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
}
private void setupMockActorContext(Class<?> requestType, Object... responses) {
any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
}
+ private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected ExecutionException");
+ } catch(ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
@Test
public void testCanCommitWithOneCohort() throws Exception {
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", true, future.get());
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
new CanCommitTransactionReply(false));
future = proxy.canCommit();
- assertEquals("canCommit", false, future.get());
+ assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", true, future.get());
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", false, future.get());
+ assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
}
- @Test(expected = ExecutionException.class)
- public void testCanCommitWithExceptionFailure() throws Exception {
+ @Test(expected = TestException.class)
+ public void testCanCommitWithExceptionFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
- proxy.canCommit().get();
+ propagateExecutionExceptionCause(proxy.canCommit());
}
@Test(expected = ExecutionException.class)
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply());
- proxy.canCommit().get();
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCanCommitWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ try {
+ propagateExecutionExceptionCause(proxy.canCommit());
+ } finally {
+ verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+ }
}
@Test
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply());
- proxy.preCommit().get();
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
}
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply(), new RuntimeException("mock"));
- proxy.preCommit().get();
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
}
@Test
setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
- proxy.abort().get();
+ proxy.abort().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
// The exception should not get propagated.
- proxy.abort().get();
+ proxy.abort().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
+ @Test
+ public void testAbortWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ // The exception should not get propagated.
+ proxy.abort().get(5, TimeUnit.SECONDS);
+
+ verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+ }
+
@Test
public void testCommit() throws Exception {
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
new CommitTransactionReply());
- proxy.commit().get();
+ proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
}
- @Test(expected = ExecutionException.class)
- public void testCommitWithFailure() throws Exception {
+ @Test(expected = TestException.class)
+ public void testCommitWithFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
- new RuntimeException("mock"));
+ new TestException());
- proxy.commit().get();
+ propagateExecutionExceptionCause(proxy.commit());
}
@Test(expected = ExecutionException.class)
- public void teseCommitWithInvalidResponseType() throws Exception {
+ public void testCommitWithInvalidResponseType() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
- proxy.commit().get();
+ proxy.commit().get(5, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCommitWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ try {
+ propagateExecutionExceptionCause(proxy.commit());
+ } finally {
+ verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+ }
}
@Test
- public void testGetCohortPaths() {
+ public void testAllThreePhasesSuccessful() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- List<ActorPath> paths = proxy.getCohortPaths();
- assertNotNull("getCohortPaths returned null", paths);
- assertEquals("getCohortPaths size", 2, paths.size());
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+
+ setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply(), new PreCommitTransactionReply());
+
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+ new CommitTransactionReply(), new CommitTransactionReply());
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
}
}
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
+
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.times;
@SuppressWarnings("resource")
public class TransactionProxyTest extends AbstractActorTest {
}
static interface Invoker {
- void invoke(TransactionProxy proxy) throws Exception;
+ CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
}
private final Configuration configuration = new MockConfiguration();
schemaContext = TestModel.createTestContext();
doReturn(getSystem()).when(mockActorContext).getActorSystem();
+ doReturn(memberName).when(mockActorContext).getCurrentMemberName();
ShardStrategyFactory.setConfiguration(configuration);
}
ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
@Override
public boolean matches(Object argument) {
- DataExists obj = DataExists.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
@Override
public boolean matches(Object argument) {
- ReadData obj = ReadData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
+ if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
WriteData obj = WriteData.fromSerializable(argument, schemaContext);
return obj.getPath().equals(TestModel.TEST_PATH) &&
obj.getData().equals(nodeToWrite);
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
+ if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
MergeData obj = MergeData.fromSerializable(argument, schemaContext);
return obj.getPath().equals(TestModel.TEST_PATH) &&
obj.getData().equals(nodeToWrite);
ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
@Override
public boolean matches(Object argument) {
- DeleteData obj = DeleteData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
return argThat(matcher);
}
- private Object readyTxReply(ActorPath path) {
- return new ReadyTransactionReply(path).toSerializable();
+ private Future<Object> readyTxReply(ActorPath path) {
+ return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
}
private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data)
- .toSerializable());
+ return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
}
private Future<Object> dataExistsReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists).toSerializable());
}
+ private Future<Object> writeDataReply() {
+ return Futures.successful(new WriteDataReply().toSerializable());
+ }
+
+ private Future<Object> mergeDataReply() {
+ return Futures.successful(new MergeDataReply().toSerializable());
+ }
+
+ private Future<Object> deleteDataReply() {
+ return Futures.successful(new DeleteDataReply().toSerializable());
+ }
+
private ActorSelection actorSelection(ActorRef actorRef) {
return getSystem().actorSelection(actorRef.path());
}
ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
doReturn(getSystem().actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
eqCreateTransaction(memberName, type), anyDuration());
return actorRef;
}
+ private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+ throws Throwable {
+
+ try {
+ future.checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ throw e.getCause();
+ }
+ }
+
@Test
public void testRead() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
}
@Test(expected = ReadFailedException.class)
- public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
+ public void testReadWithInvalidReplyMessageType() throws Exception {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
- doThrow(new TestException()).when(mockActorContext).
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
- try {
- transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
- }
+ propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
}
private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
- try {
- invoker.invoke(transactionProxy);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
- }
+ propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
}
private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
@Override
- public void invoke(TransactionProxy proxy) throws Exception {
- proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+ return proxy.read(TestModel.TEST_PATH);
}
});
}
testReadWithExceptionOnInitialCreateTransaction(new TestException());
}
+ @Test(expected = TestException.class)
+ public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+ anyDuration());
+
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ try {
+ propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
+ } finally {
+ verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ }
+ }
+
+ @Test
+ public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, expectedNode);
+
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+ TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+ assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testReadPreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+
@Test
public void testExists() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
@Override
- public void invoke(TransactionProxy proxy) throws Exception {
- proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+ return proxy.exists(TestModel.TEST_PATH);
}
});
}
@Test(expected = ReadFailedException.class)
- public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
+ public void testExistsWithInvalidReplyMessageType() throws Exception {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
- doThrow(new TestException()).when(mockActorContext).
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
+ propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+ }
+
+ @Test(expected = TestException.class)
+ public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+ anyDuration());
+
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
try {
- transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
+ propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+ } finally {
+ verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
}
}
@Test
- public void testWrite() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+
+ assertEquals("Exists response", true, exists);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testxistsPreConditionCheck() {
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+
+ private void verifyRecordingOperationFutures(List<Future<Object>> futures,
+ Class<?>... expResultTypes) throws Exception {
+ assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
+
+ int i = 0;
+ for( Future<Object> future: futures) {
+ assertNotNull("Recording operation Future is null", future);
+
+ Class<?> expResultType = expResultTypes[i++];
+ if(Throwable.class.isAssignableFrom(expResultType)) {
+ try {
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ fail("Expected exception from recording operation Future");
+ } catch(Exception e) {
+ // Expected
+ }
+ } else {
+ assertEquals("Recording operation Future result type", expResultType,
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testWritePreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testWriteAfterReadyPreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.ready();
+
+ transactionProxy.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
@Test
public void testMerge() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS);
}
@Test
public void testDelete() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
transactionProxy.delete(TestModel.TEST_PATH);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData());
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ DeleteDataReply.SERIALIZABLE_CLASS);
+ }
+
+ private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
+ Object... expReplies) throws Exception {
+ assertEquals("getReadyOperationFutures size", expReplies.length,
+ proxy.getCohortPathFutures().size());
+
+ int i = 0;
+ for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+ assertNotNull("Ready operation Future is null", future);
+
+ Object expReply = expReplies[i++];
+ if(expReply instanceof ActorPath) {
+ ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ assertEquals("Cohort actor path", expReply, actual);
+ } else {
+ // Expecting exception.
+ try {
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ fail("Expected exception from ready operation Future");
+ } catch(Exception e) {
+ // Expected
+ }
+ }
+ }
}
@SuppressWarnings("unchecked")
public void testReady() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.read(TestModel.TEST_PATH);
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortPathFutures(proxy, actorRef.path());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithRecordingOperationFailure() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
+ anyDuration());
+
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+
+ verifyCohortPathFutures(proxy, TestException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithReplyFailure() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortPathFutures(proxy, TestException.class);
+ }
+
+ @Test
+ public void testReadyWithInitialCreateTransactionFailure() throws Exception {
+
+ doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+ anyString(), any(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithInvalidReplyMessageType() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.successful(new Object())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
+ verifyCohortPathFutures(proxy, IllegalArgumentException.class);
}
@Test