X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardDataTree.java;h=a155e5e0e049ab5afc19e2dc969bbcd775c0a494;hp=9aca517bb110d2f28381167962f2ad8f3de288af;hb=f81bccec7ac422dbcfdfba70dcfa22f9824b8e4c;hpb=cb90ab104f31168b0d5ff6c7a547c8cc1cc50bef diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 9aca517bb1..a155e5e0e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Iterables; import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.IOException; @@ -31,7 +32,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -808,7 +808,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { processNextPendingTransaction(); } - private void failPreCommit(final Exception cause) { + private void failPreCommit(final Throwable cause) { shard.getShardMBean().incrementFailedTransactionsCount(); pendingTransactions.poll().cohort.failedPreCommit(cause); processNextPendingTransaction(); @@ -827,25 +827,34 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeCandidateTip candidate; try { candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); - } catch (ExecutionException | TimeoutException | RuntimeException e) { + } catch (RuntimeException e) { failPreCommit(e); return; } - // Set the tip of the data tree. - tip = Verify.verifyNotNull(candidate); + cohort.userPreCommit(candidate, new FutureCallback() { + @Override + public void onSuccess(final Void noop) { + // Set the tip of the data tree. + tip = Verify.verifyNotNull(candidate); - entry.lastAccess = readTime(); + entry.lastAccess = readTime(); - pendingTransactions.remove(); - pendingCommits.add(entry); + pendingTransactions.remove(); + pendingCommits.add(entry); - LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); + LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier()); - cohort.successfulPreCommit(candidate); + cohort.successfulPreCommit(candidate); - processNextPendingTransaction(); + processNextPendingTransaction(); + } + + @Override + public void onFailure(final Throwable failure) { + failPreCommit(failure); + } + }); } private void failCommit(final Exception cause) { @@ -878,12 +887,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); // FIXME: propagate journal index - pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO); + pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> { + LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); + notifyListeners(candidate); - LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); - notifyListeners(candidate); - - processNextPending(); + processNextPending(); + }); } void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) { @@ -958,7 +967,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, - cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT)); + cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable), + COMMIT_STEP_TIMEOUT)); pendingTransactions.add(new CommitEntry(cohort, readTime())); return cohort; } @@ -1142,11 +1152,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { try { tip.validate(cohort.getDataTreeModification()); DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification()); - cohort.userPreCommit(candidate); cohort.setNewCandidate(candidate); tip = candidate; - } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) { + } catch (RuntimeException | DataValidationFailedException e) { LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); cohort.reportFailure(e); }