Merge "Created Sample Feature Test Class for Base Feature Repository"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
index d12dc2b55a17f0ebaf1a3999778b109efbd60a0f..b56dc9432f0b28067ca2daaba1cd95f936cb816e 100644 (file)
@@ -30,7 +30,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -43,13 +42,19 @@ public class ThreePhaseCommitCohortProxy implements
 
     private final ActorContext actorContext;
     private final List<ActorPath> cohortPaths;
-    //FIXME : Use a thread pool here
-    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+    private final ExecutorService executor;
+    private final String transactionId;
 
 
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+        List<ActorPath> cohortPaths,
+        String transactionId,
+        ExecutorService executor) {
+
         this.actorContext = actorContext;
         this.cohortPaths = cohortPaths;
+        this.transactionId = transactionId;
+        this.executor = executor;
     }
 
     @Override public ListenableFuture<Boolean> canCommit() {
@@ -62,12 +67,12 @@ public class ThreePhaseCommitCohortProxy implements
                 try {
                     Object response =
                         actorContext.executeRemoteOperation(cohort,
-                            new CanCommitTransaction(),
+                            new CanCommitTransaction().toSerializable(),
                             ActorContext.ASK_DURATION);
 
-                    if (response instanceof CanCommitTransactionReply) {
+                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
                         CanCommitTransactionReply reply =
-                            (CanCommitTransactionReply) response;
+                            CanCommitTransactionReply.fromSerializable(response);
                         if (!reply.getCanCommit()) {
                             return false;
                         }
@@ -86,21 +91,21 @@ public class ThreePhaseCommitCohortProxy implements
         ListenableFutureTask<Boolean>
             future = ListenableFutureTask.create(call);
 
-        executorService.submit(future);
+        executor.submit(future);
 
         return future;
     }
 
     @Override public ListenableFuture<Void> preCommit() {
-        return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
+        return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> abort() {
-        return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
+        return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> commit() {
-        return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
+        return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
     private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
@@ -136,7 +141,7 @@ public class ThreePhaseCommitCohortProxy implements
         ListenableFutureTask<Void>
             future = ListenableFutureTask.create(call);
 
-        executorService.submit(future);
+        executor.submit(future);
 
         return future;