BUG-5280: move DataTreeCandidate serialization to its own class
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index f313329c7070a1fd582bf045888321493ee4074a..822586e9633a7085860c8191f57e767aa59bed6f 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
@@ -46,10 +47,10 @@ final class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
-        ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+        ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
     }
 
-    private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+    private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
 
     private CohortEntry currentCohortEntry;
 
@@ -143,7 +144,7 @@ final class ShardCommitCoordinator {
 
         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
         final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
-        cohortCache.put(ready.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
             return;
@@ -175,9 +176,9 @@ final class ShardCommitCoordinator {
         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
-                    dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
+                    dataTree.newReadWriteTransaction(batched.getTransactionID()),
                     cohortRegistry, schema,  batched.getVersion());
-            cohortCache.put(batched.getTransactionID(), cohortEntry);
+            cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         }
 
         if(log.isDebugEnabled()) {
@@ -238,7 +239,7 @@ final class ShardCommitCoordinator {
                 message.getTransactionID());
         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
                 DataStoreVersions.CURRENT_VERSION);
-        cohortCache.put(message.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -271,8 +272,7 @@ final class ShardCommitCoordinator {
             protected BatchedModifications getModifications() {
                 if(newModifications.isEmpty() ||
                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                    newModifications.add(new BatchedModifications(from.getTransactionID(),
-                            from.getVersion(), from.getTransactionChainID()));
+                    newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
                 }
 
                 return newModifications.getLast();
@@ -287,8 +287,6 @@ final class ShardCommitCoordinator {
     }
 
     private void handleCanCommit(CohortEntry cohortEntry) {
-        String transactionID = cohortEntry.getTransactionID();
-
         cohortEntry.updateLastAccessTime();
 
         if(currentCohortEntry != null) {
@@ -297,7 +295,7 @@ final class ShardCommitCoordinator {
 
             if(log.isDebugEnabled()) {
                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
-                        name, currentCohortEntry.getTransactionID(), transactionID);
+                        name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
             }
 
             return;
@@ -313,7 +311,7 @@ final class ShardCommitCoordinator {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
-                                transactionID);
+                                cohortEntry.getTransactionID());
             }
         }
     }
@@ -325,7 +323,7 @@ final class ShardCommitCoordinator {
      * @param sender the actor to which to send the response
      * @param shard the transaction's shard actor
      */
-    void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+    void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
         final CohortEntry cohortEntry = cohortCache.get(transactionID);
@@ -419,7 +417,7 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      * @return true if the transaction was successfully prepared, false otherwise.
      */
-    boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+    boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
         // this transaction.
         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
@@ -438,7 +436,7 @@ final class ShardCommitCoordinator {
         return doCommit(cohortEntry);
     }
 
-    void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+    void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
@@ -540,7 +538,7 @@ final class ShardCommitCoordinator {
                     if(newModifications.isEmpty() ||
                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
                         newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
-                                cohortEntry.getClientVersion(), ""));
+                                cohortEntry.getClientVersion()));
         }
 
                     return newModifications.getLast();
@@ -577,7 +575,7 @@ final class ShardCommitCoordinator {
      * @return the current CohortEntry or null if the given transaction ID does not match the
      *         current entry.
      */
-    CohortEntry getCohortEntryIfCurrent(String transactionID) {
+    CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
         if(isCurrentTransaction(transactionID)) {
             return currentCohortEntry;
         }
@@ -589,11 +587,11 @@ final class ShardCommitCoordinator {
         return currentCohortEntry;
     }
 
-    CohortEntry getAndRemoveCohortEntry(String transactionID) {
+    CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
         return cohortCache.remove(transactionID);
     }
 
-    boolean isCurrentTransaction(String transactionID) {
+    boolean isCurrentTransaction(Identifier transactionID) {
         return currentCohortEntry != null &&
                 currentCohortEntry.getTransactionID().equals(transactionID);
     }
@@ -607,7 +605,7 @@ final class ShardCommitCoordinator {
      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
      *        the cache.
      */
-    void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+    void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
             cohortCache.remove(transactionID);
         }