X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=eebad9ce065c4fbb98e3f1a7c53ecb122c9cb5a0;hb=bf95823bd121f75944e9a248731aee240f12fe5c;hp=bf5d271bfc418a87288f2760b1d6d73233ae9e30;hpb=4d7d88d74ee1177774fad5bd31ceaec2cee3056c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index bf5d271bfc..eebad9ce06 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import javax.annotation.Nonnull; @@ -103,7 +102,7 @@ final class ShardCommitCoordinator { log.debug("{}: Readying transaction {}, client version {}", name, ready.getTransactionId(), ready.getTxnClientVersion()); - final ShardDataTreeCohort cohort = ready.getTransaction().ready(); + final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -162,7 +161,7 @@ final class ShardCommitCoordinator { } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); - cohortEntry.ready(cohortDecorator); + cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator); if (batched.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -187,7 +186,8 @@ final class ShardCommitCoordinator { */ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { final TransactionIdentifier txId = message.getTransactionId(); - final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification()); + final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(), + message.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -227,7 +227,9 @@ final class ShardCommitCoordinator { BatchedModifications last = newModifications.getLast(); last.setDoCommitOnReady(from.isDoCommitOnReady()); - last.setReady(from.isReady()); + if (from.isReady()) { + last.setReady(from.getParticipatingShardNames()); + } last.setTotalMessagesSent(newModifications.size()); return newModifications; } @@ -318,6 +320,7 @@ final class ShardCommitCoordinator { final TransactionIdentifier txId = cohortEntry.getTransactionId(); log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result, sender); + cohortEntry.getShard().getDataStore().purgeTransaction(txId, null); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), @@ -326,8 +329,9 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { - log.error("{}, An exception occurred while committing transaction {}", persistenceId(), - cohortEntry.getTransactionId(), failure); + final TransactionIdentifier txId = cohortEntry.getTransactionId(); + log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure); + cohortEntry.getShard().getDataStore().purgeTransaction(txId, null); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(new Failure(failure), cohortEntry.getShard().self()); @@ -371,6 +375,8 @@ final class ShardCommitCoordinator { cohortEntry.abort(new FutureCallback() { @Override public void onSuccess(final Void result) { + shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); + if (sender != null) { sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } @@ -379,6 +385,7 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { log.error("{}: An exception happened during abort", name, failure); + shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); if (sender != null) { sender.tell(new Failure(failure), self); @@ -390,13 +397,7 @@ final class ShardCommitCoordinator { } void checkForExpiredTransactions(final long timeout, final Shard shard) { - Iterator iter = cohortCache.values().iterator(); - while (iter.hasNext()) { - CohortEntry cohortEntry = iter.next(); - if (cohortEntry.isFailed()) { - iter.remove(); - } - } + cohortCache.values().removeIf(CohortEntry::isFailed); } void abortPendingTransactions(final String reason, final Shard shard) { @@ -449,7 +450,7 @@ final class ShardCommitCoordinator { if (last != null) { final boolean immediate = cohortEntry.isDoImmediateCommit(); last.setDoCommitOnReady(immediate); - last.setReady(true); + last.setReady(cohortEntry.getParticipatingShardNames()); last.setTotalMessagesSent(newMessages.size()); messages.addAll(newMessages);