X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=3451934e25109aae47ef9b6612b47551338ab447;hp=f313329c7070a1fd582bf045888321493ee4074a;hb=348a37f613ef444b10a0e65b400390396552fc48;hpb=95edb9370d26c8759ad64474c774cd4ddfe59926 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index f313329c70..3451934e25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -34,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.StringIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -46,10 +48,10 @@ final class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. public interface CohortDecorator { - ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); + ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual); } - private final Map cohortCache = new HashMap<>(); + private final Map cohortCache = new HashMap<>(); private CohortEntry currentCohortEntry; @@ -143,7 +145,7 @@ final class ShardCommitCoordinator { final ShardDataTreeCohort cohort = ready.getTransaction().ready(); final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion()); - cohortCache.put(ready.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); if(!queueCohortEntry(cohortEntry, sender, shard)) { return; @@ -172,12 +174,12 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor */ void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) { - CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); + CohortEntry cohortEntry = cohortCache.get(new StringIdentifier(batched.getTransactionID())); if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()), cohortRegistry, schema, batched.getVersion()); - cohortCache.put(batched.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); } if(log.isDebugEnabled()) { @@ -238,7 +240,7 @@ final class ShardCommitCoordinator { message.getTransactionID()); final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema, DataStoreVersions.CURRENT_VERSION); - cohortCache.put(message.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); if(!queueCohortEntry(cohortEntry, sender, shard)) { @@ -258,7 +260,7 @@ final class ShardCommitCoordinator { Collection createForwardedBatchedModifications(final BatchedModifications from, final int maxModificationsPerBatch) { - CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID()); + CohortEntry cohortEntry = getAndRemoveCohortEntry(new StringIdentifier(from.getTransactionID())); if(cohortEntry == null || cohortEntry.getTransaction() == null) { return Collections.singletonList(from); } @@ -287,8 +289,6 @@ final class ShardCommitCoordinator { } private void handleCanCommit(CohortEntry cohortEntry) { - String transactionID = cohortEntry.getTransactionID(); - cohortEntry.updateLastAccessTime(); if(currentCohortEntry != null) { @@ -297,7 +297,7 @@ final class ShardCommitCoordinator { if(log.isDebugEnabled()) { log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now", - name, currentCohortEntry.getTransactionID(), transactionID); + name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID()); } return; @@ -313,7 +313,7 @@ final class ShardCommitCoordinator { if(log.isDebugEnabled()) { log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name, queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???", - transactionID); + cohortEntry.getTransactionID()); } } } @@ -325,7 +325,7 @@ final class ShardCommitCoordinator { * @param sender the actor to which to send the response * @param shard the transaction's shard actor */ - void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) { + void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) { // Lookup the cohort entry that was cached previously (or should have been) by // transactionReady (via the ForwardedReadyTransaction message). final CohortEntry cohortEntry = cohortCache.get(transactionID); @@ -419,7 +419,7 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor * @return true if the transaction was successfully prepared, false otherwise. */ - boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) { + boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to // this transaction. final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); @@ -438,7 +438,7 @@ final class ShardCommitCoordinator { return doCommit(cohortEntry); } - void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) { + void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { // We don't remove the cached cohort entry here (ie pass false) in case the Tx was @@ -539,7 +539,7 @@ final class ShardCommitCoordinator { protected BatchedModifications getModifications() { if(newModifications.isEmpty() || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { - newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(), + newModifications.add(new BatchedModifications(cohortEntry.getTransactionID().getString(), cohortEntry.getClientVersion(), "")); } @@ -555,12 +555,12 @@ final class ShardCommitCoordinator { messages.addAll(newModifications); if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) { - messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(), + messages.add(new CanCommitTransaction(cohortEntry.getTransactionID().getString(), cohortEntry.getClientVersion())); } if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) { - messages.add(new CommitTransaction(cohortEntry.getTransactionID(), + messages.add(new CommitTransaction(cohortEntry.getTransactionID().getString(), cohortEntry.getClientVersion())); } } @@ -577,7 +577,7 @@ final class ShardCommitCoordinator { * @return the current CohortEntry or null if the given transaction ID does not match the * current entry. */ - CohortEntry getCohortEntryIfCurrent(String transactionID) { + CohortEntry getCohortEntryIfCurrent(Identifier transactionID) { if(isCurrentTransaction(transactionID)) { return currentCohortEntry; } @@ -589,11 +589,11 @@ final class ShardCommitCoordinator { return currentCohortEntry; } - CohortEntry getAndRemoveCohortEntry(String transactionID) { + CohortEntry getAndRemoveCohortEntry(Identifier transactionID) { return cohortCache.remove(transactionID); } - boolean isCurrentTransaction(String transactionID) { + boolean isCurrentTransaction(Identifier transactionID) { return currentCohortEntry != null && currentCohortEntry.getTransactionID().equals(transactionID); } @@ -607,7 +607,7 @@ final class ShardCommitCoordinator { * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from * the cache. */ - void currentTransactionComplete(String transactionID, boolean removeCohortEntry) { + void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) { if(removeCohortEntry) { cohortCache.remove(transactionID); }