BUG-5280: refactor CohortEntry
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index f313329c7070a1fd582bf045888321493ee4074a..eb0c04dbbd86eaaabde73326baf1b35086073ce1 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
@@ -46,10 +47,10 @@ final class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
-        ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+        ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
     }
 
-    private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+    private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
 
     private CohortEntry currentCohortEntry;
 
@@ -142,8 +143,9 @@ final class ShardCommitCoordinator {
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
-        final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
-        cohortCache.put(ready.getTransactionID(), cohortEntry);
+        final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry,
+            schema, ready.getTxnClientVersion());
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
             return;
@@ -169,15 +171,14 @@ final class ShardCommitCoordinator {
      *
      * @param batched the BatchedModifications message to process
      * @param sender the sender of the message
-     * @param shard the transaction's shard actor
      */
-    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
+    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
         if(cohortEntry == null) {
-            cohortEntry = new CohortEntry(batched.getTransactionID(),
-                    dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
-                    cohortRegistry, schema,  batched.getVersion());
-            cohortCache.put(batched.getTransactionID(), cohortEntry);
+            cohortEntry = CohortEntry.createOpen(batched.getTransactionID(),
+                    dataTree.newReadWriteTransaction(batched.getTransactionID()),
+                    cohortRegistry, dataTree.getSchemaContext(), batched.getVersion());
+            cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         }
 
         if(log.isDebugEnabled()) {
@@ -232,13 +233,12 @@ final class ShardCommitCoordinator {
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
      */
-    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
-            SchemaContext schema) {
+    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
                 message.getTransactionID());
-        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
-                DataStoreVersions.CURRENT_VERSION);
-        cohortCache.put(message.getTransactionID(), cohortEntry);
+        final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry,
+            dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -271,8 +271,7 @@ final class ShardCommitCoordinator {
             protected BatchedModifications getModifications() {
                 if(newModifications.isEmpty() ||
                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                    newModifications.add(new BatchedModifications(from.getTransactionID(),
-                            from.getVersion(), from.getTransactionChainID()));
+                    newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
                 }
 
                 return newModifications.getLast();
@@ -287,8 +286,6 @@ final class ShardCommitCoordinator {
     }
 
     private void handleCanCommit(CohortEntry cohortEntry) {
-        String transactionID = cohortEntry.getTransactionID();
-
         cohortEntry.updateLastAccessTime();
 
         if(currentCohortEntry != null) {
@@ -297,7 +294,7 @@ final class ShardCommitCoordinator {
 
             if(log.isDebugEnabled()) {
                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
-                        name, currentCohortEntry.getTransactionID(), transactionID);
+                        name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
             }
 
             return;
@@ -313,7 +310,7 @@ final class ShardCommitCoordinator {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
-                                transactionID);
+                                cohortEntry.getTransactionID());
             }
         }
     }
@@ -325,7 +322,7 @@ final class ShardCommitCoordinator {
      * @param sender the actor to which to send the response
      * @param shard the transaction's shard actor
      */
-    void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+    void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
         final CohortEntry cohortEntry = cohortCache.get(transactionID);
@@ -419,7 +416,7 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      * @return true if the transaction was successfully prepared, false otherwise.
      */
-    boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+    boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
         // this transaction.
         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
@@ -438,7 +435,7 @@ final class ShardCommitCoordinator {
         return doCommit(cohortEntry);
     }
 
-    void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+    void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
@@ -540,7 +537,7 @@ final class ShardCommitCoordinator {
                     if(newModifications.isEmpty() ||
                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
                         newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
-                                cohortEntry.getClientVersion(), ""));
+                                cohortEntry.getClientVersion()));
         }
 
                     return newModifications.getLast();
@@ -577,7 +574,7 @@ final class ShardCommitCoordinator {
      * @return the current CohortEntry or null if the given transaction ID does not match the
      *         current entry.
      */
-    CohortEntry getCohortEntryIfCurrent(String transactionID) {
+    CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
         if(isCurrentTransaction(transactionID)) {
             return currentCohortEntry;
         }
@@ -589,11 +586,11 @@ final class ShardCommitCoordinator {
         return currentCohortEntry;
     }
 
-    CohortEntry getAndRemoveCohortEntry(String transactionID) {
+    CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
         return cohortCache.remove(transactionID);
     }
 
-    boolean isCurrentTransaction(String transactionID) {
+    boolean isCurrentTransaction(Identifier transactionID) {
         return currentCohortEntry != null &&
                 currentCohortEntry.getTransactionID().equals(transactionID);
     }
@@ -607,7 +604,7 @@ final class ShardCommitCoordinator {
      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
      *        the cache.
      */
-    void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+    void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
             cohortCache.remove(transactionID);
         }