X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FCommitCoordinationTask.java;h=ea581ee89a958dec12aef308ae76d9ec92c971cf;hb=refs%2Fchanges%2F63%2F82863%2F1;hp=efe4c19c5910a75c67f48661a131cbfb8722c59c;hpb=908a421ebb9f54b3b268082ac562dded5cf51b87;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java index efe4c19c59..ea581ee89a 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -26,7 +27,8 @@ import org.slf4j.LoggerFactory; * Implementation of blocking three-phase commit-coordination tasks without * support of cancellation. */ -final class CommitCoordinationTask implements Callable { +@Deprecated +final class CommitCoordinationTask implements Callable { private enum Phase { canCommit, preCommit, @@ -37,17 +39,20 @@ final class CommitCoordinationTask implements Callable { private final Collection cohorts; private final DurationStatisticsTracker commitStatTracker; private final DOMDataWriteTransaction tx; + private final Supplier futureValueSupplier; - public CommitCoordinationTask(final DOMDataWriteTransaction transaction, + CommitCoordinationTask(final DOMDataWriteTransaction transaction, final Collection cohorts, - final DurationStatisticsTracker commitStatTracker) { + final DurationStatisticsTracker commitStatTracker, + final Supplier futureValueSupplier) { this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); this.commitStatTracker = commitStatTracker; + this.futureValueSupplier = futureValueSupplier; } @Override - public Void call() throws TransactionCommitFailedException { + public T call() throws TransactionCommitFailedException { final long startTime = commitStatTracker != null ? System.nanoTime() : 0; Phase phase = Phase.canCommit; @@ -65,7 +70,7 @@ final class CommitCoordinationTask implements Callable { commitBlocking(); LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier()); - return null; + return futureValueSupplier.get(); } catch (final TransactionCommitFailedException e) { LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e); abortBlocking(e); @@ -78,10 +83,10 @@ final class CommitCoordinationTask implements Callable { } /** - * * Invokes canCommit on underlying cohorts and blocks till * all results are returned. * + *

* Valid state transition is from SUBMITTED to CAN_COMMIT, * if currentPhase is not SUBMITTED throws IllegalStateException. * @@ -103,11 +108,11 @@ final class CommitCoordinationTask implements Callable { } /** - * * Invokes canCommit on underlying cohorts and returns composite future * which will contains {@link Boolean#TRUE} only and only if * all cohorts returned true. * + *

* Valid state transition is from SUBMITTED to CAN_COMMIT, * if currentPhase is not SUBMITTED throws IllegalStateException. * @@ -116,18 +121,18 @@ final class CommitCoordinationTask implements Callable { */ private ListenableFuture[] canCommitAll() { final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; - int i = 0; + int index = 0; for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.canCommit(); + ops[index++] = cohort.canCommit(); } return ops; } /** - * * Invokes preCommit on underlying cohorts and blocks till * all results are returned. * + *

* Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current * state is not CAN_COMMIT * throws IllegalStateException. @@ -139,7 +144,7 @@ final class CommitCoordinationTask implements Callable { private void preCommitBlocking() throws TransactionCommitFailedException { final ListenableFuture[] preCommitFutures = preCommitAll(); try { - for(final ListenableFuture future : preCommitFutures) { + for (final ListenableFuture future : preCommitFutures) { future.get(); } } catch (InterruptedException | ExecutionException e) { @@ -148,12 +153,11 @@ final class CommitCoordinationTask implements Callable { } /** - * * Invokes preCommit on underlying cohorts and returns future * which will complete once all preCommit on cohorts completed or * failed. * - * + *

* Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current * state is not CAN_COMMIT * throws IllegalStateException. @@ -163,18 +167,18 @@ final class CommitCoordinationTask implements Callable { */ private ListenableFuture[] preCommitAll() { final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; - int i = 0; + int index = 0; for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.preCommit(); + ops[index++] = cohort.preCommit(); } return ops; } /** - * * Invokes commit on underlying cohorts and blocks till * all results are returned. * + *

* Valid state transition is from PRE_COMMIT to COMMIT, if not throws * IllegalStateException. * @@ -185,7 +189,7 @@ final class CommitCoordinationTask implements Callable { private void commitBlocking() throws TransactionCommitFailedException { final ListenableFuture[] commitFutures = commitAll(); try { - for(final ListenableFuture future : commitFutures) { + for (final ListenableFuture future : commitFutures) { future.get(); } } catch (InterruptedException | ExecutionException e) { @@ -194,11 +198,11 @@ final class CommitCoordinationTask implements Callable { } /** - * * Invokes commit on underlying cohorts and returns future which * completes * once all commits on cohorts are completed. * + *

* Valid state transition is from PRE_COMMIT to COMMIT, if not throws * IllegalStateException * @@ -206,9 +210,9 @@ final class CommitCoordinationTask implements Callable { */ private ListenableFuture[] commitAll() { final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; - int i = 0; + int index = 0; for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.commit(); + ops[index++] = cohort.commit(); } return ops; } @@ -216,12 +220,14 @@ final class CommitCoordinationTask implements Callable { /** * Aborts transaction. * + *

* Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all * cohorts, blocks * for all results. If any of the abort failed throws * IllegalStateException, * which will contains originalCause as suppressed Exception. * + *

* If aborts we're successful throws supplied exception * * @param originalCause @@ -235,7 +241,8 @@ final class CommitCoordinationTask implements Callable { * @throws IllegalStateException * if abort failed. */ - private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException { + private void abortBlocking( + final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException { Exception cause = originalCause; try { abortAsyncAll().get(); @@ -258,9 +265,9 @@ final class CommitCoordinationTask implements Callable { private ListenableFuture abortAsyncAll() { final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; - int i = 0; + int index = 0; for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.abort(); + ops[index++] = cohort.abort(); } /*