Elide front-end 3PC for single-shard Tx
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index b96e38d76a45aced0e1c326c051cb7fcbd1d82d1..4ff9b5fd4353e5857ec6533c7a0cfc4ee6ec4ee2 100644 (file)
@@ -21,11 +21,15 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.slf4j.Logger;
@@ -56,8 +60,6 @@ public class ShardCommitCoordinator {
 
     private final String name;
 
-    private final String shardActorPath;
-
     private final RemovalListener<String, CohortEntry> cacheRemovalListener =
             new RemovalListener<String, CohortEntry>() {
                 @Override
@@ -71,6 +73,8 @@ public class ShardCommitCoordinator {
     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     private CohortDecorator cohortDecorator;
 
+    private ReadyTransactionReply readyTransactionReply;
+
     public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
             long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
 
@@ -79,8 +83,6 @@ public class ShardCommitCoordinator {
         this.name = name;
         this.transactionFactory = transactionFactory;
 
-        shardActorPath = Serialization.serializedActorPath(shardActor);
-
         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
                 removalListener(cacheRemovalListener).build();
 
@@ -93,18 +95,55 @@ public class ShardCommitCoordinator {
         this.queueCapacity = queueCapacity;
     }
 
+    private ReadyTransactionReply readyTransactionReply(Shard shard) {
+        if(readyTransactionReply == null) {
+            readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
+        }
+
+        return readyTransactionReply;
+    }
+
     /**
      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
-     *
-     * @param transactionID the ID of the transaction
-     * @param cohort the cohort to participate in the transaction commit
-     * @param modification the modifications made by the transaction
      */
-    public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
-            MutableCompositeModification modification) {
+    public void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+        log.debug("{}: Readying transaction {}, client version {}", name,
+                ready.getTransactionID(), ready.getTxnClientVersion());
+
+        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
+                (MutableCompositeModification) ready.getModification());
+        cohortCache.put(ready.getTransactionID(), cohortEntry);
+
+        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+            // Return our actor path as we'll handle the three phase commit except if the Tx client
+            // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
+            // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
+            // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
+            ActorRef replyActorPath = shard.self();
+            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+                log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
+                replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                        ready.getTransactionID()));
+            }
 
-        cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
+            ReadyTransactionReply readyTransactionReply =
+                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+                            ready.getTxnClientVersion());
+            sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+                readyTransactionReply, shard.self());
+        } else {
+            if(ready.isDoImmediateCommit()) {
+                cohortEntry.setDoImmediateCommit(true);
+                cohortEntry.setReplySender(sender);
+                cohortEntry.setShard(shard);
+                handleCanCommit(cohortEntry);
+            } else {
+                // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
+                // front-end so send back a ReadyTransactionReply with our actor path.
+                sender.tell(readyTransactionReply(shard), shard.self());
+            }
+        }
     }
 
     /**
@@ -118,7 +157,7 @@ public class ShardCommitCoordinator {
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
-    public boolean handleTransactionModifications(BatchedModifications batched)
+    boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
             throws ExecutionException {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
@@ -142,43 +181,30 @@ public class ShardCommitCoordinator {
                         batched.getTransactionID(), batched.getVersion());
             }
 
-            cohortEntry.ready(cohortDecorator);
+            cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
+
+            if(batched.isDoCommitOnReady()) {
+                cohortEntry.setReplySender(sender);
+                cohortEntry.setShard(shard);
+                handleCanCommit(cohortEntry);
+            } else {
+                sender.tell(readyTransactionReply(shard), shard.self());
+            }
+        } else {
+            sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
         }
 
         return batched.isReady();
     }
 
-    /**
-     * This method handles the canCommit phase for a transaction.
-     *
-     * @param canCommit the CanCommitTransaction message
-     * @param sender the actor that sent the message
-     * @param shard the transaction's shard actor
-     */
-    public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
-            final ActorRef shard) {
-        String transactionID = canCommit.getTransactionID();
+    private void handleCanCommit(CohortEntry cohortEntry) {
+        String transactionID = cohortEntry.getTransactionID();
+
         if(log.isDebugEnabled()) {
             log.debug("{}: Processing canCommit for transaction {} for shard {}",
-                    name, transactionID, shard.path());
-        }
-
-        // Lookup the cohort entry that was cached previously (or should have been) by
-        // transactionReady (via the ForwardedReadyTransaction message).
-        final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
-        if(cohortEntry == null) {
-            // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
-            // between canCommit and ready and the entry was expired from the cache.
-            IllegalStateException ex = new IllegalStateException(
-                    String.format("%s: No cohort entry found for transaction %s", name, transactionID));
-            log.error(ex.getMessage());
-            sender.tell(new Status.Failure(ex), shard);
-            return;
+                    name, transactionID, cohortEntry.getShard().self().path());
         }
 
-        cohortEntry.setCanCommitSender(sender);
-        cohortEntry.setShard(shard);
-
         if(currentCohortEntry != null) {
             // There's already a Tx commit in progress - attempt to queue this entry to be
             // committed after the current Tx completes.
@@ -195,7 +221,7 @@ public class ShardCommitCoordinator {
                                       " capacity %d has been reached.",
                                       name, transactionID, queueCapacity));
                 log.error(ex.getMessage());
-                sender.tell(new Status.Failure(ex), shard);
+                cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self());
             }
         } else {
             // No Tx commit currently in progress - make this the current entry and proceed with
@@ -207,29 +233,119 @@ public class ShardCommitCoordinator {
         }
     }
 
+    /**
+     * This method handles the canCommit phase for a transaction.
+     *
+     * @param canCommit the CanCommitTransaction message
+     * @param sender the actor that sent the message
+     * @param shard the transaction's shard actor
+     */
+    public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+        // Lookup the cohort entry that was cached previously (or should have been) by
+        // transactionReady (via the ForwardedReadyTransaction message).
+        final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
+        if(cohortEntry == null) {
+            // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
+            // between canCommit and ready and the entry was expired from the cache.
+            IllegalStateException ex = new IllegalStateException(
+                    String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+            log.error(ex.getMessage());
+            sender.tell(new Status.Failure(ex), shard.self());
+            return;
+        }
+
+        cohortEntry.setReplySender(sender);
+        cohortEntry.setShard(shard);
+
+        handleCanCommit(cohortEntry);
+    }
+
     private void doCanCommit(final CohortEntry cohortEntry) {
 
+        boolean canCommit = false;
         try {
             // We block on the future here so we don't have to worry about possibly accessing our
             // state on a different thread outside of our dispatcher. Also, the data store
             // currently uses a same thread executor anyway.
-            Boolean canCommit = cohortEntry.getCohort().canCommit().get();
+            canCommit = cohortEntry.getCohort().canCommit().get();
+
+            if(cohortEntry.isDoImmediateCommit()) {
+                if(canCommit) {
+                    doCommit(cohortEntry);
+                } else {
+                    cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
+                                "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
+                }
+            } else {
+                cohortEntry.getReplySender().tell(
+                        canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+                            CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
+            }
+        } catch (Exception e) {
+            log.debug("{}: An exception occurred during canCommit: {}", name, e);
 
-            cohortEntry.getCanCommitSender().tell(
-                    canCommit ? CanCommitTransactionReply.YES.toSerializable() :
-                        CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
+            Throwable failure = e;
+            if(e instanceof ExecutionException) {
+                failure = e.getCause();
+            }
 
+            cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
+        } finally {
             if(!canCommit) {
-                // Remove the entry from the cache now since the Tx will be aborted.
-                removeCohortEntry(cohortEntry.getTransactionID());
+                // Remove the entry from the cache now.
+                currentTransactionComplete(cohortEntry.getTransactionID(), true);
             }
-        } catch (InterruptedException | ExecutionException e) {
-            log.debug("{}: An exception occurred during canCommit: {}", name, e);
+        }
+    }
+
+    private boolean doCommit(CohortEntry cohortEntry) {
+        log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
 
-            // Remove the entry from the cache now since the Tx will be aborted.
-            removeCohortEntry(cohortEntry.getTransactionID());
-            cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
+        boolean success = false;
+
+        // We perform the preCommit phase here atomically with the commit phase. This is an
+        // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
+        // coordination of preCommit across shards in case of failure but preCommit should not
+        // normally fail since we ensure only one concurrent 3-phase commit.
+
+        try {
+            // We block on the future here so we don't have to worry about possibly accessing our
+            // state on a different thread outside of our dispatcher. Also, the data store
+            // currently uses a same thread executor anyway.
+            cohortEntry.getCohort().preCommit().get();
+
+            cohortEntry.getShard().continueCommit(cohortEntry);
+
+            cohortEntry.updateLastAccessTime();
+
+            success = true;
+        } catch (Exception e) {
+            log.error("{} An exception occurred while preCommitting transaction {}",
+                    name, cohortEntry.getTransactionID(), e);
+            cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
+
+            currentTransactionComplete(cohortEntry.getTransactionID(), true);
+        }
+
+        return success;
+    }
+
+    boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+        // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
+        // this transaction.
+        final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
+        if(cohortEntry == null) {
+            // We're not the current Tx - the Tx was likely expired b/c it took too long in
+            // between the canCommit and commit messages.
+            IllegalStateException ex = new IllegalStateException(
+                    String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+                            name, transactionID));
+            log.error(ex.getMessage());
+            sender.tell(new akka.actor.Status.Failure(ex), shard.self());
+            return false;
         }
+
+        return doCommit(cohortEntry);
     }
 
     /**
@@ -302,9 +418,10 @@ public class ShardCommitCoordinator {
         private DOMStoreThreePhaseCommitCohort cohort;
         private final MutableCompositeModification compositeModification;
         private final DOMStoreWriteTransaction transaction;
-        private ActorRef canCommitSender;
-        private ActorRef shard;
+        private ActorRef replySender;
+        private Shard shard;
         private long lastAccessTime;
+        private boolean doImmediateCommit;
 
         CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
             this.compositeModification = new MutableCompositeModification();
@@ -347,9 +464,11 @@ public class ShardCommitCoordinator {
             }
         }
 
-        void ready(CohortDecorator cohortDecorator) {
+        void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
             Preconditions.checkState(cohort == null, "cohort was already set");
 
+            setDoImmediateCommit(doImmediateCommit);
+
             cohort = transaction.ready();
 
             if(cohortDecorator != null) {
@@ -358,19 +477,27 @@ public class ShardCommitCoordinator {
             }
         }
 
-        ActorRef getCanCommitSender() {
-            return canCommitSender;
+        boolean isDoImmediateCommit() {
+            return doImmediateCommit;
+        }
+
+        void setDoImmediateCommit(boolean doImmediateCommit) {
+            this.doImmediateCommit = doImmediateCommit;
+        }
+
+        ActorRef getReplySender() {
+            return replySender;
         }
 
-        void setCanCommitSender(ActorRef canCommitSender) {
-            this.canCommitSender = canCommitSender;
+        void setReplySender(ActorRef replySender) {
+            this.replySender = replySender;
         }
 
-        ActorRef getShard() {
+        Shard getShard() {
             return shard;
         }
 
-        void setShard(ActorRef shard) {
+        void setShard(Shard shard) {
             this.shard = shard;
         }