import akka.actor.ActorPath;
import akka.actor.ActorSelection;
+
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
private final ActorContext actorContext;
private final List<ActorPath> cohortPaths;
- private final ExecutorService executor;
+ private final ListeningExecutorService executor;
private final String transactionId;
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<ActorPath> cohortPaths,
String transactionId,
- ExecutorService executor) {
+ ListeningExecutorService executor) {
this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
}
@Override public ListenableFuture<Boolean> canCommit() {
- Callable<Boolean> call = new Callable() {
-
- @Override public Boolean call() throws Exception {
- for(ActorPath actorPath : cohortPaths){
- ActorSelection cohort = actorContext.actorSelection(actorPath);
-
- try {
- Object response =
- actorContext.executeRemoteOperation(cohort,
- new CanCommitTransaction().toSerializable(),
- ActorContext.ASK_DURATION);
-
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
- if (!reply.getCanCommit()) {
- return false;
+ LOG.debug("txn {} canCommit", transactionId);
+ Callable<Boolean> call = new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ for(ActorPath actorPath : cohortPaths){
+
+ Object message = new CanCommitTransaction().toSerializable();
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ try {
+ Object response =
+ actorContext.executeRemoteOperation(cohort,
+ message,
+ ActorContext.ASK_DURATION);
+
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ return false;
+ }
}
+ } catch(RuntimeException e){
+ // FIXME : Need to properly handle this
+ LOG.error("Unexpected Exception", e);
+ return false;
}
- } catch(RuntimeException e){
- LOG.error("Unexpected Exception", e);
- return false;
}
-
- }
- return true;
+ return true;
}
};
- ListenableFutureTask<Boolean>
- future = ListenableFutureTask.create(call);
-
- executor.submit(future);
-
- return future;
+ return executor.submit(call);
}
@Override public ListenableFuture<Void> preCommit() {
+ LOG.debug("txn {} preCommit", transactionId);
return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> abort() {
+ LOG.debug("txn {} abort", transactionId);
return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> commit() {
+ LOG.debug("txn {} commit", transactionId);
return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
}
for(ActorPath actorPath : cohortPaths){
ActorSelection cohort = actorContext.actorSelection(actorPath);
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
try {
Object response =
actorContext.executeRemoteOperation(cohort,
}
};
- ListenableFutureTask<Void>
- future = ListenableFutureTask.create(call);
-
- executor.submit(future);
-
- return future;
-
+ return executor.submit(call);
}
public List<ActorPath> getCohortPaths() {