Merge "Re-added config.version to config-module-archetype."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
index 36b6efa2f7091d9f178d682aded4fc4888f42dff..279ecba40977e1293a9fbad090cd1a0dfddff2a4 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -29,7 +30,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -42,13 +42,19 @@ public class ThreePhaseCommitCohortProxy implements
 
     private final ActorContext actorContext;
     private final List<ActorPath> cohortPaths;
-    //FIXME : Use a thread pool here
-    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+    private final ExecutorService executor;
+    private final String transactionId;
 
 
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+        List<ActorPath> cohortPaths,
+        String transactionId,
+        ExecutorService executor) {
+
         this.actorContext = actorContext;
         this.cohortPaths = cohortPaths;
+        this.transactionId = transactionId;
+        this.executor = executor;
     }
 
     @Override public ListenableFuture<Boolean> canCommit() {
@@ -85,7 +91,7 @@ public class ThreePhaseCommitCohortProxy implements
         ListenableFutureTask<Boolean>
             future = ListenableFutureTask.create(call);
 
-        executorService.submit(future);
+        executor.submit(future);
 
         return future;
     }
@@ -109,16 +115,23 @@ public class ThreePhaseCommitCohortProxy implements
                 for(ActorPath actorPath : cohortPaths){
                     ActorSelection cohort = actorContext.actorSelection(actorPath);
 
-                    Object response = actorContext.executeRemoteOperation(cohort,
-                        message,
-                        ActorContext.ASK_DURATION);
-
-                    if(response != null && !response.getClass().equals(expectedResponseClass)){
-                        throw new RuntimeException(
-                            String.format(
-                                "did not get the expected response \n\t\t expected : %s \n\t\t actual   : %s",
-                                expectedResponseClass.toString(),
-                                response.getClass().toString()));
+                    try {
+                        Object response =
+                            actorContext.executeRemoteOperation(cohort,
+                                message,
+                                ActorContext.ASK_DURATION);
+
+                        if (response != null && !response.getClass()
+                            .equals(expectedResponseClass)) {
+                            throw new RuntimeException(
+                                String.format(
+                                    "did not get the expected response \n\t\t expected : %s \n\t\t actual   : %s",
+                                    expectedResponseClass.toString(),
+                                    response.getClass().toString())
+                            );
+                        }
+                    } catch(TimeoutException e){
+                        LOG.error(String.format("A timeout occurred when processing operation : %s", message));
                     }
                 }
                 return null;
@@ -128,7 +141,7 @@ public class ThreePhaseCommitCohortProxy implements
         ListenableFutureTask<Void>
             future = ListenableFutureTask.create(call);
 
-        executorService.submit(future);
+        executor.submit(future);
 
         return future;