import akka.actor.ActorPath;
import akka.actor.ActorSelection;
+
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
private final ActorContext actorContext;
private final List<ActorPath> cohortPaths;
- //FIXME : Use a thread pool here
- private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private final ListeningExecutorService executor;
+ private final String transactionId;
+
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+ List<ActorPath> cohortPaths,
+ String transactionId,
+ ListeningExecutorService executor) {
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
+ this.transactionId = transactionId;
+ this.executor = executor;
}
@Override public ListenableFuture<Boolean> canCommit() {
- Callable<Boolean> call = new Callable() {
-
- @Override public Boolean call() throws Exception {
- for(ActorPath actorPath : cohortPaths){
- ActorSelection cohort = actorContext.actorSelection(actorPath);
-
- try {
- Object response =
- actorContext.executeRemoteOperation(cohort,
- new CanCommitTransaction(),
- ActorContext.ASK_DURATION);
-
- if (response instanceof CanCommitTransactionReply) {
- CanCommitTransactionReply reply =
- (CanCommitTransactionReply) response;
- if (!reply.getCanCommit()) {
- return false;
+ LOG.debug("txn {} canCommit", transactionId);
+ Callable<Boolean> call = new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ for(ActorPath actorPath : cohortPaths){
+
+ Object message = new CanCommitTransaction().toSerializable();
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ try {
+ Object response =
+ actorContext.executeRemoteOperation(cohort,
+ message,
+ ActorContext.ASK_DURATION);
+
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ return false;
+ }
}
+ } catch(RuntimeException e){
+ // FIXME : Need to properly handle this
+ LOG.error("Unexpected Exception", e);
+ return false;
}
- } catch(RuntimeException e){
- LOG.error("Unexpected Exception", e);
- return false;
}
-
- }
- return true;
+ return true;
}
};
- ListenableFutureTask<Boolean>
- future = ListenableFutureTask.create(call);
-
- executorService.submit(future);
-
- return future;
+ return executor.submit(call);
}
@Override public ListenableFuture<Void> preCommit() {
- return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
+ LOG.debug("txn {} preCommit", transactionId);
+ return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> abort() {
- return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
+ LOG.debug("txn {} abort", transactionId);
+ return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> commit() {
- return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
+ LOG.debug("txn {} commit", transactionId);
+ return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
}
private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
for(ActorPath actorPath : cohortPaths){
ActorSelection cohort = actorContext.actorSelection(actorPath);
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
try {
Object response =
actorContext.executeRemoteOperation(cohort,
}
};
- ListenableFutureTask<Void>
- future = ListenableFutureTask.create(call);
-
- executorService.submit(future);
-
- return future;
-
+ return executor.submit(call);
}
public List<ActorPath> getCohortPaths() {