X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=eb0c04dbbd86eaaabde73326baf1b35086073ce1;hb=refs%2Fchanges%2F28%2F41428%2F3;hp=f313329c7070a1fd582bf045888321493ee4074a;hpb=b90ef95072662f8a8a479b6a4bd49376a54e9fb7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index f313329c70..eb0c04dbbd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -46,10 +47,10 @@ final class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. public interface CohortDecorator { - ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); + ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual); } - private final Map cohortCache = new HashMap<>(); + private final Map cohortCache = new HashMap<>(); private CohortEntry currentCohortEntry; @@ -142,8 +143,9 @@ final class ShardCommitCoordinator { ready.getTransactionID(), ready.getTxnClientVersion()); final ShardDataTreeCohort cohort = ready.getTransaction().ready(); - final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion()); - cohortCache.put(ready.getTransactionID(), cohortEntry); + final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry, + schema, ready.getTxnClientVersion()); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); if(!queueCohortEntry(cohortEntry, sender, shard)) { return; @@ -169,15 +171,14 @@ final class ShardCommitCoordinator { * * @param batched the BatchedModifications message to process * @param sender the sender of the message - * @param shard the transaction's shard actor */ - void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) { + void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); if(cohortEntry == null) { - cohortEntry = new CohortEntry(batched.getTransactionID(), - dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()), - cohortRegistry, schema, batched.getVersion()); - cohortCache.put(batched.getTransactionID(), cohortEntry); + cohortEntry = CohortEntry.createOpen(batched.getTransactionID(), + dataTree.newReadWriteTransaction(batched.getTransactionID()), + cohortRegistry, dataTree.getSchemaContext(), batched.getVersion()); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); } if(log.isDebugEnabled()) { @@ -232,13 +233,12 @@ final class ShardCommitCoordinator { * @param sender the sender of the message * @param shard the transaction's shard actor */ - void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard, - SchemaContext schema) { + void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) { final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(), message.getTransactionID()); - final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema, - DataStoreVersions.CURRENT_VERSION); - cohortCache.put(message.getTransactionID(), cohortEntry); + final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry, + dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION); + cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); if(!queueCohortEntry(cohortEntry, sender, shard)) { @@ -271,8 +271,7 @@ final class ShardCommitCoordinator { protected BatchedModifications getModifications() { if(newModifications.isEmpty() || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { - newModifications.add(new BatchedModifications(from.getTransactionID(), - from.getVersion(), from.getTransactionChainID())); + newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion())); } return newModifications.getLast(); @@ -287,8 +286,6 @@ final class ShardCommitCoordinator { } private void handleCanCommit(CohortEntry cohortEntry) { - String transactionID = cohortEntry.getTransactionID(); - cohortEntry.updateLastAccessTime(); if(currentCohortEntry != null) { @@ -297,7 +294,7 @@ final class ShardCommitCoordinator { if(log.isDebugEnabled()) { log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now", - name, currentCohortEntry.getTransactionID(), transactionID); + name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID()); } return; @@ -313,7 +310,7 @@ final class ShardCommitCoordinator { if(log.isDebugEnabled()) { log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name, queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???", - transactionID); + cohortEntry.getTransactionID()); } } } @@ -325,7 +322,7 @@ final class ShardCommitCoordinator { * @param sender the actor to which to send the response * @param shard the transaction's shard actor */ - void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) { + void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) { // Lookup the cohort entry that was cached previously (or should have been) by // transactionReady (via the ForwardedReadyTransaction message). final CohortEntry cohortEntry = cohortCache.get(transactionID); @@ -419,7 +416,7 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor * @return true if the transaction was successfully prepared, false otherwise. */ - boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) { + boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to // this transaction. final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); @@ -438,7 +435,7 @@ final class ShardCommitCoordinator { return doCommit(cohortEntry); } - void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) { + void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { // We don't remove the cached cohort entry here (ie pass false) in case the Tx was @@ -540,7 +537,7 @@ final class ShardCommitCoordinator { if(newModifications.isEmpty() || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(), - cohortEntry.getClientVersion(), "")); + cohortEntry.getClientVersion())); } return newModifications.getLast(); @@ -577,7 +574,7 @@ final class ShardCommitCoordinator { * @return the current CohortEntry or null if the given transaction ID does not match the * current entry. */ - CohortEntry getCohortEntryIfCurrent(String transactionID) { + CohortEntry getCohortEntryIfCurrent(Identifier transactionID) { if(isCurrentTransaction(transactionID)) { return currentCohortEntry; } @@ -589,11 +586,11 @@ final class ShardCommitCoordinator { return currentCohortEntry; } - CohortEntry getAndRemoveCohortEntry(String transactionID) { + CohortEntry getAndRemoveCohortEntry(Identifier transactionID) { return cohortCache.remove(transactionID); } - boolean isCurrentTransaction(String transactionID) { + boolean isCurrentTransaction(Identifier transactionID) { return currentCohortEntry != null && currentCohortEntry.getTransactionID().equals(transactionID); } @@ -607,7 +604,7 @@ final class ShardCommitCoordinator { * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from * the cache. */ - void currentTransactionComplete(String transactionID, boolean removeCohortEntry) { + void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) { if(removeCohortEntry) { cohortCache.remove(transactionID); }