X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=57854e87339ab5af7578f5a05a47e72ce8c04933;hb=a6ffbcf79601fb8dc03e90300711acb63ed674b6;hp=bf5d271bfc418a87288f2760b1d6d73233ae9e30;hpb=4d7d88d74ee1177774fad5bd31ceaec2cee3056c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index bf5d271bfc..57854e8733 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import javax.annotation.Nonnull; @@ -103,7 +102,7 @@ final class ShardCommitCoordinator { log.debug("{}: Readying transaction {}, client version {}", name, ready.getTransactionId(), ready.getTxnClientVersion()); - final ShardDataTreeCohort cohort = ready.getTransaction().ready(); + final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -130,7 +129,7 @@ final class ShardCommitCoordinator { */ void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId()); - if (cohortEntry == null) { + if (cohortEntry == null || cohortEntry.isSealed()) { cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()), batched.getVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -157,12 +156,12 @@ final class ShardCommitCoordinator { } if (log.isDebugEnabled()) { - log.debug("{}: Readying Tx {}, client version {}", name, - batched.getTransactionId(), batched.getVersion()); + log.debug("{}: Readying Tx {} of {} operations, client version {}", name, + batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion()); } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); - cohortEntry.ready(cohortDecorator); + cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator); if (batched.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -187,7 +186,8 @@ final class ShardCommitCoordinator { */ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { final TransactionIdentifier txId = message.getTransactionId(); - final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification()); + final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(), + message.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -227,7 +227,9 @@ final class ShardCommitCoordinator { BatchedModifications last = newModifications.getLast(); last.setDoCommitOnReady(from.isDoCommitOnReady()); - last.setReady(from.isReady()); + if (from.isReady()) { + last.setReady(from.getParticipatingShardNames()); + } last.setTotalMessagesSent(newModifications.size()); return newModifications; } @@ -249,8 +251,8 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { - log.debug("{}: An exception occurred during canCommit for {}: {}", name, - cohortEntry.getTransactionId(), failure); + log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(), + failure); cohortCache.remove(cohortEntry.getTransactionId()); cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); @@ -274,7 +276,7 @@ final class ShardCommitCoordinator { // between canCommit and ready and the entry was expired from the cache or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID)); - log.error(ex.getMessage()); + log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex); sender.tell(new Failure(ex), shard.self()); return; } @@ -326,8 +328,8 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { - log.error("{}, An exception occurred while committing transaction {}", persistenceId(), - cohortEntry.getTransactionId(), failure); + final TransactionIdentifier txId = cohortEntry.getTransactionId(); + log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(new Failure(failure), cohortEntry.getShard().self()); @@ -349,7 +351,7 @@ final class ShardCommitCoordinator { // or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID)); - log.error(ex.getMessage()); + log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex); sender.tell(new Failure(ex), shard.self()); return; } @@ -390,13 +392,7 @@ final class ShardCommitCoordinator { } void checkForExpiredTransactions(final long timeout, final Shard shard) { - Iterator iter = cohortCache.values().iterator(); - while (iter.hasNext()) { - CohortEntry cohortEntry = iter.next(); - if (cohortEntry.isFailed()) { - iter.remove(); - } - } + cohortCache.values().removeIf(CohortEntry::isFailed); } void abortPendingTransactions(final String reason, final Shard shard) { @@ -449,7 +445,7 @@ final class ShardCommitCoordinator { if (last != null) { final boolean immediate = cohortEntry.isDoImmediateCommit(); last.setDoCommitOnReady(immediate); - last.setReady(true); + last.setReady(cohortEntry.getParticipatingShardNames()); last.setTotalMessagesSent(newMessages.size()); messages.addAll(newMessages);