Merge "Optimizations, Monitoring and Logging"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
index 197b3b70cefa292e3829b776560b4cd2ae99967f..5b447943ea7fd798e5572e55483ff8b4fcf7e331 100644 (file)
@@ -9,11 +9,28 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
+
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -21,27 +38,114 @@ import java.util.List;
 public class ThreePhaseCommitCohortProxy implements
     DOMStoreThreePhaseCommitCohort{
 
+    private static final Logger
+        LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+
+    private final ActorContext actorContext;
     private final List<ActorPath> cohortPaths;
+    private final ListeningExecutorService executor;
+    private final String transactionId;
 
-    public ThreePhaseCommitCohortProxy(List<ActorPath> cohortPaths) {
 
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+        List<ActorPath> cohortPaths,
+        String transactionId,
+        ListeningExecutorService executor) {
+
+        this.actorContext = actorContext;
         this.cohortPaths = cohortPaths;
+        this.transactionId = transactionId;
+        this.executor = executor;
     }
 
     @Override public ListenableFuture<Boolean> canCommit() {
-        throw new UnsupportedOperationException("canCommit");
+        LOG.debug("txn {} canCommit", transactionId);
+        Callable<Boolean> call = new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                for(ActorPath actorPath : cohortPaths){
+
+                    Object message = new CanCommitTransaction().toSerializable();
+                    LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
+                    ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+                    try {
+                        Object response =
+                                actorContext.executeRemoteOperation(cohort,
+                                        message,
+                                        ActorContext.ASK_DURATION);
+
+                        if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+                            CanCommitTransactionReply reply =
+                                    CanCommitTransactionReply.fromSerializable(response);
+                            if (!reply.getCanCommit()) {
+                                return false;
+                            }
+                        }
+                    } catch(RuntimeException e){
+                        // FIXME : Need to properly handle this
+                        LOG.error("Unexpected Exception", e);
+                        return false;
+                    }
+                }
+
+                return true;
+            }
+        };
+
+        return executor.submit(call);
     }
 
     @Override public ListenableFuture<Void> preCommit() {
-        throw new UnsupportedOperationException("preCommit");
+        LOG.debug("txn {} preCommit", transactionId);
+        return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> abort() {
-        throw new UnsupportedOperationException("abort");
+        LOG.debug("txn {} abort", transactionId);
+        return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> commit() {
-        throw new UnsupportedOperationException("commit");
+        LOG.debug("txn {} commit", transactionId);
+        return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
+    }
+
+    private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
+        Callable<Void> call = new Callable<Void>() {
+
+            @Override public Void call() throws Exception {
+                for(ActorPath actorPath : cohortPaths){
+                    ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+                    LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
+                    try {
+                        Object response =
+                            actorContext.executeRemoteOperation(cohort,
+                                message,
+                                ActorContext.ASK_DURATION);
+
+                        if (response != null && !response.getClass()
+                            .equals(expectedResponseClass)) {
+                            throw new RuntimeException(
+                                String.format(
+                                    "did not get the expected response \n\t\t expected : %s \n\t\t actual   : %s",
+                                    expectedResponseClass.toString(),
+                                    response.getClass().toString())
+                            );
+                        }
+                    } catch(TimeoutException e){
+                        LOG.error(String.format("A timeout occurred when processing operation : %s", message));
+                    }
+                }
+                return null;
+            }
+        };
+
+        return executor.submit(call);
     }
 
     public List<ActorPath> getCohortPaths() {