Merge "Created Sample Feature Test Class for Base Feature Repository"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
index 36b6efa2f7091d9f178d682aded4fc4888f42dff..b56dc9432f0b28067ca2daaba1cd95f936cb816e 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -29,7 +30,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -42,13 +42,19 @@ public class ThreePhaseCommitCohortProxy implements
 
     private final ActorContext actorContext;
     private final List<ActorPath> cohortPaths;
-    //FIXME : Use a thread pool here
-    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+    private final ExecutorService executor;
+    private final String transactionId;
 
 
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+        List<ActorPath> cohortPaths,
+        String transactionId,
+        ExecutorService executor) {
+
         this.actorContext = actorContext;
         this.cohortPaths = cohortPaths;
+        this.transactionId = transactionId;
+        this.executor = executor;
     }
 
     @Override public ListenableFuture<Boolean> canCommit() {
@@ -61,12 +67,12 @@ public class ThreePhaseCommitCohortProxy implements
                 try {
                     Object response =
                         actorContext.executeRemoteOperation(cohort,
-                            new CanCommitTransaction(),
+                            new CanCommitTransaction().toSerializable(),
                             ActorContext.ASK_DURATION);
 
-                    if (response instanceof CanCommitTransactionReply) {
+                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
                         CanCommitTransactionReply reply =
-                            (CanCommitTransactionReply) response;
+                            CanCommitTransactionReply.fromSerializable(response);
                         if (!reply.getCanCommit()) {
                             return false;
                         }
@@ -85,21 +91,21 @@ public class ThreePhaseCommitCohortProxy implements
         ListenableFutureTask<Boolean>
             future = ListenableFutureTask.create(call);
 
-        executorService.submit(future);
+        executor.submit(future);
 
         return future;
     }
 
     @Override public ListenableFuture<Void> preCommit() {
-        return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
+        return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> abort() {
-        return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
+        return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> commit() {
-        return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
+        return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
     private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
@@ -109,16 +115,23 @@ public class ThreePhaseCommitCohortProxy implements
                 for(ActorPath actorPath : cohortPaths){
                     ActorSelection cohort = actorContext.actorSelection(actorPath);
 
-                    Object response = actorContext.executeRemoteOperation(cohort,
-                        message,
-                        ActorContext.ASK_DURATION);
-
-                    if(response != null && !response.getClass().equals(expectedResponseClass)){
-                        throw new RuntimeException(
-                            String.format(
-                                "did not get the expected response \n\t\t expected : %s \n\t\t actual   : %s",
-                                expectedResponseClass.toString(),
-                                response.getClass().toString()));
+                    try {
+                        Object response =
+                            actorContext.executeRemoteOperation(cohort,
+                                message,
+                                ActorContext.ASK_DURATION);
+
+                        if (response != null && !response.getClass()
+                            .equals(expectedResponseClass)) {
+                            throw new RuntimeException(
+                                String.format(
+                                    "did not get the expected response \n\t\t expected : %s \n\t\t actual   : %s",
+                                    expectedResponseClass.toString(),
+                                    response.getClass().toString())
+                            );
+                        }
+                    } catch(TimeoutException e){
+                        LOG.error(String.format("A timeout occurred when processing operation : %s", message));
                     }
                 }
                 return null;
@@ -128,7 +141,7 @@ public class ThreePhaseCommitCohortProxy implements
         ListenableFutureTask<Void>
             future = ListenableFutureTask.create(call);
 
-        executorService.submit(future);
+        executor.submit(future);
 
         return future;