BUG-5280: switch transactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index f313329c7070a1fd582bf045888321493ee4074a..3451934e25109aae47ef9b6612b47551338ab447 100644 (file)
@@ -34,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
@@ -46,10 +48,10 @@ final class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
-        ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+        ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
     }
 
-    private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+    private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
 
     private CohortEntry currentCohortEntry;
 
@@ -143,7 +145,7 @@ final class ShardCommitCoordinator {
 
         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
         final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
-        cohortCache.put(ready.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
             return;
@@ -172,12 +174,12 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      */
     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
-        CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
+        CohortEntry cohortEntry = cohortCache.get(new StringIdentifier(batched.getTransactionID()));
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
                     cohortRegistry, schema,  batched.getVersion());
-            cohortCache.put(batched.getTransactionID(), cohortEntry);
+            cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         }
 
         if(log.isDebugEnabled()) {
@@ -238,7 +240,7 @@ final class ShardCommitCoordinator {
                 message.getTransactionID());
         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
                 DataStoreVersions.CURRENT_VERSION);
-        cohortCache.put(message.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -258,7 +260,7 @@ final class ShardCommitCoordinator {
 
     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
             final int maxModificationsPerBatch) {
-        CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+        CohortEntry cohortEntry = getAndRemoveCohortEntry(new StringIdentifier(from.getTransactionID()));
         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
             return Collections.singletonList(from);
         }
@@ -287,8 +289,6 @@ final class ShardCommitCoordinator {
     }
 
     private void handleCanCommit(CohortEntry cohortEntry) {
-        String transactionID = cohortEntry.getTransactionID();
-
         cohortEntry.updateLastAccessTime();
 
         if(currentCohortEntry != null) {
@@ -297,7 +297,7 @@ final class ShardCommitCoordinator {
 
             if(log.isDebugEnabled()) {
                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
-                        name, currentCohortEntry.getTransactionID(), transactionID);
+                        name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
             }
 
             return;
@@ -313,7 +313,7 @@ final class ShardCommitCoordinator {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
-                                transactionID);
+                                cohortEntry.getTransactionID());
             }
         }
     }
@@ -325,7 +325,7 @@ final class ShardCommitCoordinator {
      * @param sender the actor to which to send the response
      * @param shard the transaction's shard actor
      */
-    void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+    void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
         final CohortEntry cohortEntry = cohortCache.get(transactionID);
@@ -419,7 +419,7 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      * @return true if the transaction was successfully prepared, false otherwise.
      */
-    boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+    boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
         // this transaction.
         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
@@ -438,7 +438,7 @@ final class ShardCommitCoordinator {
         return doCommit(cohortEntry);
     }
 
-    void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+    void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
@@ -539,7 +539,7 @@ final class ShardCommitCoordinator {
                 protected BatchedModifications getModifications() {
                     if(newModifications.isEmpty() ||
                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID().getString(),
                                 cohortEntry.getClientVersion(), ""));
         }
 
@@ -555,12 +555,12 @@ final class ShardCommitCoordinator {
                 messages.addAll(newModifications);
 
                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
-                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID().getString(),
                             cohortEntry.getClientVersion()));
                 }
 
                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
-                    messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+                    messages.add(new CommitTransaction(cohortEntry.getTransactionID().getString(),
                             cohortEntry.getClientVersion()));
                 }
             }
@@ -577,7 +577,7 @@ final class ShardCommitCoordinator {
      * @return the current CohortEntry or null if the given transaction ID does not match the
      *         current entry.
      */
-    CohortEntry getCohortEntryIfCurrent(String transactionID) {
+    CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
         if(isCurrentTransaction(transactionID)) {
             return currentCohortEntry;
         }
@@ -589,11 +589,11 @@ final class ShardCommitCoordinator {
         return currentCohortEntry;
     }
 
-    CohortEntry getAndRemoveCohortEntry(String transactionID) {
+    CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
         return cohortCache.remove(transactionID);
     }
 
-    boolean isCurrentTransaction(String transactionID) {
+    boolean isCurrentTransaction(Identifier transactionID) {
         return currentCohortEntry != null &&
                 currentCohortEntry.getTransactionID().equals(transactionID);
     }
@@ -607,7 +607,7 @@ final class ShardCommitCoordinator {
      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
      *        the cache.
      */
-    void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+    void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
             cohortCache.remove(transactionID);
         }