Bug 4105: Pass ModuleShardConfiguration with CreateShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
index c479da73127760977d4c27b3ff9873d10c295c57..09b6568e1ade016accf0ab80788065950d469ec1 100644 (file)
@@ -11,11 +11,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -24,7 +24,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -33,30 +32,15 @@ import scala.runtime.AbstractFunction1;
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
  */
-public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
+public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
-    private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
-            com.google.common.util.concurrent.Futures.immediateFuture(null);
-
     private final ActorContext actorContext;
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
     private final String transactionId;
-    private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
-        @Override
-        public void run() {
-        }
-
-        @Override
-        public void success() {
-        }
-
-        @Override
-        public void failure() {
-        }
-    };
+    private volatile OperationCallback commitOperationCallback;
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
             List<Future<ActorSelection>> cohortFutures, String transactionId) {
@@ -80,7 +64,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
                 return null;
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+        }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
@@ -117,45 +101,72 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} finishCanCommit", transactionId);
         }
-        // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
-        // their canCommit processing. If any one fails then we'll fail canCommit.
 
-        Future<Iterable<Object>> combinedFuture =
-                invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
+        // For empty transactions return immediately
+        if(cohorts.size() == 0){
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
+            }
+            returnFuture.set(Boolean.TRUE);
+            return;
+        }
+
+        commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
+            new TransactionRateLimitingCallback(actorContext);
 
-        combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+        commitOperationCallback.run();
+
+        final Object message = new CanCommitTransaction(transactionId).toSerializable();
+
+        final Iterator<ActorSelection> iterator = cohorts.iterator();
+
+        final OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
-            public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
-                if(failure != null) {
-                    if(LOG.isDebugEnabled()) {
+            public void onComplete(Throwable failure, Object response) throws Throwable {
+                if (failure != null) {
+                    if (LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
                     }
                     returnFuture.setException(failure);
+                    commitOperationCallback.failure();
                     return;
                 }
 
+                // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
+                // this means we'll only time the first transaction canCommit which should be fine.
+                commitOperationCallback.pause();
+
                 boolean result = true;
-                for(Object response: responses) {
-                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
-                        CanCommitTransactionReply reply =
-                                CanCommitTransactionReply.fromSerializable(response);
-                        if (!reply.getCanCommit()) {
-                            result = false;
-                            break;
-                        }
-                    } else {
-                        LOG.error("Unexpected response type {}", response.getClass());
-                        returnFuture.setException(new IllegalArgumentException(
-                                String.format("Unexpected response type %s", response.getClass())));
-                        return;
+                if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+                    CanCommitTransactionReply reply =
+                            CanCommitTransactionReply.fromSerializable(response);
+                    if (!reply.getCanCommit()) {
+                        result = false;
                     }
+                } else {
+                    LOG.error("Unexpected response type {}", response.getClass());
+                    returnFuture.setException(new IllegalArgumentException(
+                            String.format("Unexpected response type %s", response.getClass())));
+                    return;
                 }
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
+                if(iterator.hasNext() && result){
+                    Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+                            actorContext.getTransactionCommitOperationTimeout());
+                    future.onComplete(this, actorContext.getClientDispatcher());
+                } else {
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+                    }
+                    returnFuture.set(Boolean.valueOf(result));
                 }
-                returnFuture.set(Boolean.valueOf(result));
+
             }
-        }, actorContext.getClientDispatcher());
+        };
+
+        Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
+                actorContext.getTransactionCommitOperationTimeout());
+        future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
@@ -174,7 +185,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     public ListenableFuture<Void> preCommit() {
         // We don't need to do anything here - preCommit is done atomically with the commit phase
         // by the shard.
-        return IMMEDIATE_SUCCESS;
+        return IMMEDIATE_VOID_SUCCESS;
     }
 
     @Override
@@ -191,8 +202,8 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> commit() {
-        OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
-                new TransactionRateLimitingCallback(actorContext);
+        OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+            OperationCallback.NO_OP_CALLBACK;
 
         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
@@ -200,7 +211,8 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
-        return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
+        return voidOperation(operationName, message, expectedResponseClass, propagateException,
+                OperationCallback.NO_OP_CALLBACK);
     }
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
@@ -249,7 +261,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             LOG.debug("Tx {} finish {}", transactionId, operationName);
         }
 
-        callback.run();
+        callback.resume();
 
         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
 
@@ -304,7 +316,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         }, actorContext.getClientDispatcher());
     }
 
-    @VisibleForTesting
+    @Override
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }