Merge "Use BatchedModifications message in place of ReadyTransaction message"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 17f1abb92cc4ec7d4d0a7dffb818cec20b5cff0b..b2a86376941d4e8e609d8e3f976041b9f7e665b3 100644 (file)
@@ -439,8 +439,12 @@ public class Shard extends RaftActor {
         //
         if(isLeader()) {
             try {
-                BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
-                sender().tell(reply, self());
+                boolean ready = commitCoordinator.handleTransactionModifications(batched);
+                if(ready) {
+                    sender().tell(READY_TRANSACTION_REPLY, self());
+                } else {
+                    sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+                }
             } catch (Exception e) {
                 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                         batched.getTransactionID(), e);
@@ -480,20 +484,21 @@ public class Shard extends RaftActor {
         // node. In that case, the subsequent 3-phase commit messages won't contain the
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
-        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-            ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                    ready.getTransactionID()));
+        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+            ActorRef replyActorPath = getSelf();
+            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+                LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+                replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                        ready.getTransactionID()));
+            }
 
             ReadyTransactionReply readyTransactionReply =
-                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+                            ready.getTxnClientVersion());
             getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                    readyTransactionReply, getSelf());
-
+                readyTransactionReply, getSelf());
         } else {
-
-            getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
-                    READY_TRANSACTION_REPLY, getSelf());
+            getSender().tell(READY_TRANSACTION_REPLY, getSelf());
         }
     }