import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
private final ActorContext actorContext;
private final List<ActorPath> cohortPaths;
- //FIXME : Use a thread pool here
- private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private final ExecutorService executor;
+ private final String transactionId;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+ List<ActorPath> cohortPaths,
+ String transactionId,
+ ExecutorService executor) {
+
this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
+ this.transactionId = transactionId;
+ this.executor = executor;
}
@Override public ListenableFuture<Boolean> canCommit() {
try {
Object response =
actorContext.executeRemoteOperation(cohort,
- new CanCommitTransaction(),
+ new CanCommitTransaction().toSerializable(),
ActorContext.ASK_DURATION);
- if (response instanceof CanCommitTransactionReply) {
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
CanCommitTransactionReply reply =
- (CanCommitTransactionReply) response;
+ CanCommitTransactionReply.fromSerializable(response);
if (!reply.getCanCommit()) {
return false;
}
ListenableFutureTask<Boolean>
future = ListenableFutureTask.create(call);
- executorService.submit(future);
+ executor.submit(future);
return future;
}
@Override public ListenableFuture<Void> preCommit() {
- return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
+ return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> abort() {
- return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
+ return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> commit() {
- return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
+ return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
}
private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
ListenableFutureTask<Void>
future = ListenableFutureTask.create(call);
- executorService.submit(future);
+ executor.submit(future);
return future;