Follow-up to protobuff deprecation
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 57c5b1de11fca6ad0496f9c4a3b06eb873048fdf..51d8d5caec18e0c94a22520fb23d1b9708acdbf4 100644 (file)
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
@@ -136,41 +135,22 @@ class ShardCommitCoordinator {
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
         ShardDataTreeCohort cohort = ready.getTransaction().ready();
-        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
+        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
         cohortCache.put(ready.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
             return;
         }
 
-        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
-            // Return our actor path as we'll handle the three phase commit except if the Tx client
-            // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
-            // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
-            // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
-            ActorRef replyActorPath = shard.self();
-            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-                log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
-                replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                        ready.getTransactionID()));
-            }
-
-            ReadyTransactionReply readyTransactionReply =
-                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
-                            ready.getTxnClientVersion());
-            sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                readyTransactionReply, shard.self());
+        if(ready.isDoImmediateCommit()) {
+            cohortEntry.setDoImmediateCommit(true);
+            cohortEntry.setReplySender(sender);
+            cohortEntry.setShard(shard);
+            handleCanCommit(cohortEntry);
         } else {
-            if(ready.isDoImmediateCommit()) {
-                cohortEntry.setDoImmediateCommit(true);
-                cohortEntry.setReplySender(sender);
-                cohortEntry.setShard(shard);
-                handleCanCommit(cohortEntry);
-            } else {
-                // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
-                // front-end so send back a ReadyTransactionReply with our actor path.
-                sender.tell(readyTransactionReply(shard), shard.self());
-            }
+            // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
+            // front-end so send back a ReadyTransactionReply with our actor path.
+            sender.tell(readyTransactionReply(shard), shard.self());
         }
     }
 
@@ -189,7 +169,7 @@ class ShardCommitCoordinator {
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
-                        batched.getTransactionChainID()));
+                        batched.getTransactionChainID()), batched.getVersion());
             cohortCache.put(batched.getTransactionID(), cohortEntry);
         }
 
@@ -247,7 +227,8 @@ class ShardCommitCoordinator {
     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
                 message.getTransactionID());
-        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+                DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
@@ -339,9 +320,11 @@ class ShardCommitCoordinator {
                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
                 }
             } else {
+                // FIXME - use caller's version
                 cohortEntry.getReplySender().tell(
-                        canCommit ? CanCommitTransactionReply.YES.toSerializable() :
-                            CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
+                        canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
+                            CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
+                        cohortEntry.getShard().self());
             }
         } catch (Exception e) {
             log.debug("{}: An exception occurred during canCommit", name, e);
@@ -440,7 +423,7 @@ class ShardCommitCoordinator {
             shard.getShardMBean().incrementAbortTransactionsCount();
 
             if(sender != null) {
-                sender.tell(new AbortTransactionReply().toSerializable(), self);
+                sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
             }
         } catch (Exception e) {
             log.error("{}: An exception happened during abort", name, e);
@@ -606,16 +589,19 @@ class ShardCommitCoordinator {
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
         private int totalBatchedModificationsReceived;
         private boolean aborted;
+        private final short clientVersion;
 
-        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
+        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
             this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
+            this.clientVersion = clientVersion;
         }
 
-        CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
+        CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
             this.transactionID = transactionID;
             this.cohort = cohort;
             this.transaction = null;
+            this.clientVersion = clientVersion;
         }
 
         void updateLastAccessTime() {
@@ -627,6 +613,10 @@ class ShardCommitCoordinator {
             return transactionID;
         }
 
+        short getClientVersion() {
+            return clientVersion;
+        }
+
         DataTreeCandidate getCandidate() {
             return cohort.getCandidate();
         }