X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=822586e9633a7085860c8191f57e767aa59bed6f;hb=175f38490b56c4b4e0ec356b17b91f887b295da4;hp=f313329c7070a1fd582bf045888321493ee4074a;hpb=b90ef95072662f8a8a479b6a4bd49376a54e9fb7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index f313329c70..822586e963 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -46,10 +47,10 @@ final class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. public interface CohortDecorator { - ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); + ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual); } - private final Map cohortCache = new HashMap<>(); + private final Map cohortCache = new HashMap<>(); private CohortEntry currentCohortEntry; @@ -143,7 +144,7 @@ final class ShardCommitCoordinator { final ShardDataTreeCohort cohort = ready.getTransaction().ready(); final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion()); - cohortCache.put(ready.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); if(!queueCohortEntry(cohortEntry, sender, shard)) { return; @@ -175,9 +176,9 @@ final class ShardCommitCoordinator { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), - dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()), + dataTree.newReadWriteTransaction(batched.getTransactionID()), cohortRegistry, schema, batched.getVersion()); - cohortCache.put(batched.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); } if(log.isDebugEnabled()) { @@ -238,7 +239,7 @@ final class ShardCommitCoordinator { message.getTransactionID()); final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema, DataStoreVersions.CURRENT_VERSION); - cohortCache.put(message.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); if(!queueCohortEntry(cohortEntry, sender, shard)) { @@ -271,8 +272,7 @@ final class ShardCommitCoordinator { protected BatchedModifications getModifications() { if(newModifications.isEmpty() || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { - newModifications.add(new BatchedModifications(from.getTransactionID(), - from.getVersion(), from.getTransactionChainID())); + newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion())); } return newModifications.getLast(); @@ -287,8 +287,6 @@ final class ShardCommitCoordinator { } private void handleCanCommit(CohortEntry cohortEntry) { - String transactionID = cohortEntry.getTransactionID(); - cohortEntry.updateLastAccessTime(); if(currentCohortEntry != null) { @@ -297,7 +295,7 @@ final class ShardCommitCoordinator { if(log.isDebugEnabled()) { log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now", - name, currentCohortEntry.getTransactionID(), transactionID); + name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID()); } return; @@ -313,7 +311,7 @@ final class ShardCommitCoordinator { if(log.isDebugEnabled()) { log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name, queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???", - transactionID); + cohortEntry.getTransactionID()); } } } @@ -325,7 +323,7 @@ final class ShardCommitCoordinator { * @param sender the actor to which to send the response * @param shard the transaction's shard actor */ - void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) { + void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) { // Lookup the cohort entry that was cached previously (or should have been) by // transactionReady (via the ForwardedReadyTransaction message). final CohortEntry cohortEntry = cohortCache.get(transactionID); @@ -419,7 +417,7 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor * @return true if the transaction was successfully prepared, false otherwise. */ - boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) { + boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to // this transaction. final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); @@ -438,7 +436,7 @@ final class ShardCommitCoordinator { return doCommit(cohortEntry); } - void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) { + void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { // We don't remove the cached cohort entry here (ie pass false) in case the Tx was @@ -540,7 +538,7 @@ final class ShardCommitCoordinator { if(newModifications.isEmpty() || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(), - cohortEntry.getClientVersion(), "")); + cohortEntry.getClientVersion())); } return newModifications.getLast(); @@ -577,7 +575,7 @@ final class ShardCommitCoordinator { * @return the current CohortEntry or null if the given transaction ID does not match the * current entry. */ - CohortEntry getCohortEntryIfCurrent(String transactionID) { + CohortEntry getCohortEntryIfCurrent(Identifier transactionID) { if(isCurrentTransaction(transactionID)) { return currentCohortEntry; } @@ -589,11 +587,11 @@ final class ShardCommitCoordinator { return currentCohortEntry; } - CohortEntry getAndRemoveCohortEntry(String transactionID) { + CohortEntry getAndRemoveCohortEntry(Identifier transactionID) { return cohortCache.remove(transactionID); } - boolean isCurrentTransaction(String transactionID) { + boolean isCurrentTransaction(Identifier transactionID) { return currentCohortEntry != null && currentCohortEntry.getTransactionID().equals(transactionID); } @@ -607,7 +605,7 @@ final class ShardCommitCoordinator { * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from * the cache. */ - void currentTransactionComplete(String transactionID, boolean removeCohortEntry) { + void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) { if(removeCohortEntry) { cohortCache.remove(transactionID); }