X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=78b49a60ae5cef444b28633fd9b8fe397d117914;hp=31198b98f2541c1e2cf1684d292e076d2edc90d3;hb=823bd74f34ee1c651f1f90daeef386a35c68d431;hpb=3ed71a2888cb0e38096d9bf15b78213948d1f328 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 31198b98f2..78b49a60ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -36,6 +36,7 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.UnaryOperator; import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; @@ -380,7 +381,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } if (!current.cohort.getIdentifier().equals(txId)) { - LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext, + LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, current.cohort.getIdentifier(), txId); return; } @@ -560,21 +561,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") private void processNextPendingTransaction() { - while (!pendingTransactions.isEmpty()) { - final CommitEntry entry = pendingTransactions.peek(); + processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); - if (cohort.isFailed()) { - LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); - pendingTransactions.remove(); - continue; - } - - if (cohort.getState() != State.CAN_COMMIT_PENDING) { - break; - } - LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); Exception cause; try { @@ -603,24 +593,28 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one pendingTransactions.poll().cohort.failedCanCommit(cause); - } + }); + } - maybeRunOperationOnPendingTransactionsComplete(); + private void processNextPending() { + processNextPendingFinishCommit(); + processNextPendingCommit(); + processNextPendingTransaction(); } - private void processNextPendingCommit() { - while (!pendingCommits.isEmpty()) { - final CommitEntry entry = pendingCommits.peek(); + private void processNextPending(Queue queue, State allowedState, Consumer processor) { + while (!queue.isEmpty()) { + final CommitEntry entry = queue.peek(); final SimpleShardDataTreeCohort cohort = entry.cohort; if (cohort.isFailed()) { LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); - pendingCommits.remove(); + queue.remove(); continue; } - if (cohort.getState() == State.COMMIT_PENDING) { - startCommit(cohort, cohort.getCandidate()); + if (cohort.getState() == allowedState) { + processor.accept(entry); } break; @@ -629,16 +623,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { maybeRunOperationOnPendingTransactionsComplete(); } + private void processNextPendingCommit() { + processNextPending(pendingCommits, State.COMMIT_PENDING, + entry -> startCommit(entry.cohort, entry.cohort.getCandidate())); + } + + private void processNextPendingFinishCommit() { + processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING, + entry -> payloadReplicationComplete(entry.cohort.getIdentifier())); + } + private boolean peekNextPendingCommit() { final CommitEntry first = pendingCommits.peek(); return first != null && first.cohort.getState() == State.COMMIT_PENDING; } - private void processNextPending() { - processNextPendingCommit(); - processNextPendingTransaction(); - } - void startCanCommit(final SimpleShardDataTreeCohort cohort) { final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort; if (!cohort.equals(current)) { @@ -739,15 +738,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier()); + final TransactionIdentifier txId = cohort.getIdentifier(); if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) { LOG.debug("{}: No replication required, proceeding to finish commit", logContext); pendingCommits.remove(); pendingFinishCommits.add(entry); - finishCommit(cohort); + cohort.finishCommitPending(); + payloadReplicationComplete(txId); return; } - final TransactionIdentifier txId = cohort.getIdentifier(); final Payload payload; try { payload = CommitTransactionPayload.create(txId, candidate);