X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=61de06a07e1efd90e5bf8eca0870a81376018a3f;hb=refs%2Fchanges%2F17%2F82217%2F1;hp=080d3eec23a22ef3f7cb9771d78f42e905d08dc1;hpb=df3361f6703078faff9c4b029d0021a63ebe6e6e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 080d3eec23..61de06a07e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import javax.annotation.Nonnull; @@ -103,7 +102,7 @@ final class ShardCommitCoordinator { log.debug("{}: Readying transaction {}, client version {}", name, ready.getTransactionId(), ready.getTxnClientVersion()); - final ShardDataTreeCohort cohort = ready.getTransaction().ready(); + final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -130,7 +129,7 @@ final class ShardCommitCoordinator { */ void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId()); - if (cohortEntry == null) { + if (cohortEntry == null || cohortEntry.isSealed()) { cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()), batched.getVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -162,7 +161,7 @@ final class ShardCommitCoordinator { } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); - cohortEntry.ready(cohortDecorator); + cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator); if (batched.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -187,7 +186,8 @@ final class ShardCommitCoordinator { */ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { final TransactionIdentifier txId = message.getTransactionId(); - final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification()); + final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(), + message.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -227,7 +227,9 @@ final class ShardCommitCoordinator { BatchedModifications last = newModifications.getLast(); last.setDoCommitOnReady(from.isDoCommitOnReady()); - last.setReady(from.isReady()); + if (from.isReady()) { + last.setReady(from.getParticipatingShardNames()); + } last.setTotalMessagesSent(newModifications.size()); return newModifications; } @@ -249,8 +251,8 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { - log.debug("{}: An exception occurred during canCommit for {}: {}", name, - cohortEntry.getTransactionId(), failure); + log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(), + failure); cohortCache.remove(cohortEntry.getTransactionId()); cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); @@ -274,7 +276,7 @@ final class ShardCommitCoordinator { // between canCommit and ready and the entry was expired from the cache or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID)); - log.error(ex.getMessage()); + log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex); sender.tell(new Failure(ex), shard.self()); return; } @@ -351,7 +353,7 @@ final class ShardCommitCoordinator { // or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID)); - log.error(ex.getMessage()); + log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex); sender.tell(new Failure(ex), shard.self()); return; } @@ -395,13 +397,7 @@ final class ShardCommitCoordinator { } void checkForExpiredTransactions(final long timeout, final Shard shard) { - Iterator iter = cohortCache.values().iterator(); - while (iter.hasNext()) { - CohortEntry cohortEntry = iter.next(); - if (cohortEntry.isFailed()) { - iter.remove(); - } - } + cohortCache.values().removeIf(CohortEntry::isFailed); } void abortPendingTransactions(final String reason, final Shard shard) { @@ -454,7 +450,7 @@ final class ShardCommitCoordinator { if (last != null) { final boolean immediate = cohortEntry.isDoImmediateCommit(); last.setDoCommitOnReady(immediate); - last.setReady(true); + last.setReady(cohortEntry.getParticipatingShardNames()); last.setTotalMessagesSent(newMessages.size()); messages.addAll(newMessages);