Merge "Elide front-end 3PC for single-shard Tx"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 7110adc625ac52aa6b54717052a8f511beef89a7..91e072b076ef7c68116009c94127df859ef7540b 100644 (file)
@@ -30,7 +30,6 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
-import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
@@ -40,7 +39,6 @@ import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -49,7 +47,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -107,9 +104,6 @@ public class Shard extends RaftActor {
 
     private final MessageTracker appendEntriesReplyTracker;
 
-    private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
-            Serialization.serializedActorPath(getSelf()));
-
     private final DOMTransactionFactory domTransactionFactory;
 
     private final ShardTransactionActorFactory transactionActorFactory;
@@ -240,7 +234,8 @@ public class Shard extends RaftActor {
             } else if (BatchedModifications.class.isInstance(message)) {
                 handleBatchedModifications((BatchedModifications)message);
             } else if (message instanceof ForwardedReadyTransaction) {
-                handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+                commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
+                        getSender(), this);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
             } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -310,54 +305,22 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleCommitTransaction(final CommitTransaction commit) {
-        final String transactionID = commit.getTransactionID();
-
-        LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
-
-        // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
-        // this transaction.
-        final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry == null) {
-            // We're not the current Tx - the Tx was likely expired b/c it took too long in
-            // between the canCommit and commit messages.
-            IllegalStateException ex = new IllegalStateException(
-                    String.format("%s: Cannot commit transaction %s - it is not the current transaction",
-                            persistenceId(), transactionID));
-            LOG.error(ex.getMessage());
-            shardMBean.incrementFailedTransactionsCount();
-            getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
-            return;
+    void continueCommit(final CohortEntry cohortEntry) throws Exception {
+        // If we do not have any followers and we are not using persistence
+        // or if cohortEntry has no modifications
+        // we can apply modification to the state immediately
+        if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
+            applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification());
+        } else {
+            Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+                    new ModificationPayload(cohortEntry.getModification()));
         }
+    }
 
-        // We perform the preCommit phase here atomically with the commit phase. This is an
-        // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
-        // coordination of preCommit across shards in case of failure but preCommit should not
-        // normally fail since we ensure only one concurrent 3-phase commit.
-
-        try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().preCommit().get();
-
-            // If we do not have any followers and we are not using persistence
-            // or if cohortEntry has no modifications
-            // we can apply modification to the state immediately
-            if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
-                applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
-            } else {
-                Shard.this.persistData(getSender(), transactionID,
-                        new ModificationPayload(cohortEntry.getModification()));
-            }
-        } catch (Exception e) {
-            LOG.error("{} An exception occurred while preCommitting transaction {}",
-                    persistenceId(), cohortEntry.getTransactionID(), e);
+    private void handleCommitTransaction(final CommitTransaction commit) {
+        if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
             shardMBean.incrementFailedTransactionsCount();
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
-
-        cohortEntry.updateLastAccessTime();
     }
 
     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
@@ -415,7 +378,7 @@ public class Shard extends RaftActor {
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
-        commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+        commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
     private void handleBatchedModifications(BatchedModifications batched) {
@@ -433,12 +396,7 @@ public class Shard extends RaftActor {
         //
         if(isLeader()) {
             try {
-                boolean ready = commitCoordinator.handleTransactionModifications(batched);
-                if(ready) {
-                    sender().tell(READY_TRANSACTION_REPLY, self());
-                } else {
-                    sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
-                }
+                commitCoordinator.handleBatchedModifications(batched, getSender(), this);
             } catch (Exception e) {
                 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                         batched.getTransactionID(), e);
@@ -463,39 +421,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
-        LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
-                ready.getTransactionID(), ready.getTxnClientVersion());
-
-        // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
-        // commitCoordinator in preparation for the subsequent three phase commit initiated by
-        // the front-end.
-        commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
-                (MutableCompositeModification) ready.getModification());
-
-        // Return our actor path as we'll handle the three phase commit, except if the Tx client
-        // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
-        // node. In that case, the subsequent 3-phase commit messages won't contain the
-        // transactionId so to maintain backwards compatibility, we create a separate cohort actor
-        // to provide the compatible behavior.
-        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
-            ActorRef replyActorPath = getSelf();
-            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-                LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-                replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                        ready.getTransactionID()));
-            }
-
-            ReadyTransactionReply readyTransactionReply =
-                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
-                            ready.getTxnClientVersion());
-            getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                readyTransactionReply, getSelf());
-        } else {
-            getSender().tell(READY_TRANSACTION_REPLY, getSelf());
-        }
-    }
-
     private void handleAbortTransaction(final AbortTransaction abort) {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }