}
@Override public ListenableFuture<Boolean> canCommit() {
+ LOG.debug("txn {} canCommit", transactionId);
Callable<Boolean> call = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
for(ActorPath actorPath : cohortPaths){
+
+ Object message = new CanCommitTransaction().toSerializable();
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
ActorSelection cohort = actorContext.actorSelection(actorPath);
try {
Object response =
actorContext.executeRemoteOperation(cohort,
- new CanCommitTransaction().toSerializable(),
+ message,
ActorContext.ASK_DURATION);
if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
}
}
} catch(RuntimeException e){
+ // FIXME : Need to properly handle this
LOG.error("Unexpected Exception", e);
return false;
}
}
@Override public ListenableFuture<Void> preCommit() {
+ LOG.debug("txn {} preCommit", transactionId);
return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> abort() {
+ LOG.debug("txn {} abort", transactionId);
return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> commit() {
+ LOG.debug("txn {} commit", transactionId);
return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
}
for(ActorPath actorPath : cohortPaths){
ActorSelection cohort = actorContext.actorSelection(actorPath);
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
try {
Object response =
actorContext.executeRemoteOperation(cohort,