import static java.util.Objects.requireNonNull;
import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.opendaylight.mdsal.common.api.CommitInfo;
static final class WithTracker extends CommitCoordinationTask {
private final DurationStatisticsTracker commitStatTracker;
- WithTracker(final DOMDataTreeWriteTransaction transaction,
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
+ WithTracker(final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort,
final DurationStatisticsTracker commitStatTracker) {
- super(transaction, cohorts);
+ super(transaction, cohort);
this.commitStatTracker = requireNonNull(commitStatTracker);
}
}
private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
- private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+
+ private final DOMStoreThreePhaseCommitCohort cohort;
private final DOMDataTreeWriteTransaction tx;
- CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction,
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
tx = requireNonNull(transaction, "transaction must not be null");
- this.cohorts = requireNonNull(cohorts, "cohorts must not be null");
+ this.cohort = requireNonNull(cohort, "cohort must not be null");
}
@Override
}
/**
- * Invokes canCommit on underlying cohorts and blocks till all results are returned.
+ * Invokes canCommit on underlying cohort and blocks till the result is returned.
*
* <p>
* Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
* IllegalStateException.
*
- * @throws TransactionCommitFailedException If one of cohorts failed can Commit
+ * @throws TransactionCommitFailedException If cohort fails Commit
*/
@SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
private void canCommitBlocking() throws TransactionCommitFailedException {
- for (final ListenableFuture<?> canCommit : canCommitAll()) {
- try {
- final Boolean result = (Boolean)canCommit.get();
- if (result == null || !result) {
- throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
- }
- } catch (InterruptedException | ExecutionException e) {
- throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
- }
+ final var future = cohort.canCommit();
+ final Boolean result;
+ try {
+ result = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
}
- }
- /**
- * Invokes canCommit on underlying cohorts and returns composite future which will contain {@link Boolean#TRUE} only
- * and only if all cohorts returned true.
- *
- * <p>
- * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
- * IllegalStateException.
- *
- * @return List of all cohorts futures from can commit phase.
- */
- private ListenableFuture<?>[] canCommitAll() {
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
- int index = 0;
- for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[index++] = cohort.canCommit();
+ if (!Boolean.TRUE.equals(result)) {
+ throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
}
- return ops;
}
/**
- * Invokes preCommit on underlying cohorts and blocks until all results are returned.
+ * Invokes preCommit on underlying cohort and blocks until the result is returned.
*
* <p>
* Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
* IllegalStateException.
*
- * @throws TransactionCommitFailedException If one of cohorts failed preCommit
+ * @throws TransactionCommitFailedException If cohort fails preCommit
*/
@SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
private void preCommitBlocking() throws TransactionCommitFailedException {
- final ListenableFuture<?>[] preCommitFutures = preCommitAll();
try {
- for (final ListenableFuture<?> future : preCommitFutures) {
- future.get();
- }
+ cohort.preCommit().get();
} catch (InterruptedException | ExecutionException e) {
throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
}
}
/**
- * Invokes preCommit on underlying cohorts and returns future which will complete once all preCommit on cohorts
- * completed or failed.
- *
- * <p>
- * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
- * IllegalStateException.
- *
- * @return List of all cohorts futures from can commit phase.
- */
- private ListenableFuture<?>[] preCommitAll() {
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
- int index = 0;
- for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[index++] = cohort.preCommit();
- }
- return ops;
- }
-
- /**
- * Invokes commit on underlying cohorts and blocks until all results are returned.
+ * Invokes commit on underlying cohort and blocks until result is returned.
*
* <p>
* Valid state transition is from PRE_COMMIT to COMMIT, if not throws IllegalStateException.
*
- * @throws TransactionCommitFailedException If one of cohorts failed preCommit
+ * @throws TransactionCommitFailedException If cohort fails preCommit
*/
@SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
private void commitBlocking() throws TransactionCommitFailedException {
- final ListenableFuture<?>[] commitFutures = commitAll();
try {
- for (final ListenableFuture<?> future : commitFutures) {
- future.get();
- }
+ cohort.commit().get();
} catch (InterruptedException | ExecutionException e) {
throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
}
}
- /**
- * Invokes commit on underlying cohorts and returns future which
- * completes
- * once all commits on cohorts are completed.
- *
- *<p>
- * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
- * IllegalStateException
- *
- * @return List of all cohorts futures from can commit phase.
- */
- private ListenableFuture<?>[] commitAll() {
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
- int index = 0;
- for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[index++] = cohort.commit();
- }
- return ops;
- }
-
/**
* Aborts transaction.
*
* <p>
- * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all cohorts, blocks for all results. If any
- * of the abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
+ * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on underlying cohort, blocks the results. If
+ * abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
*
* <p>
- * If aborts we're successful throws supplied exception
+ * If abort was successful throws supplied exception
*
* @param originalCause Exception which should be used to fail transaction for consumers of transaction future
* and listeners of transaction failure.
- * @param phase phase in which the problem ensued
- * @throws TransactionCommitFailedException on invocation of this method.
* @throws IllegalStateException if abort failed.
+ * @throws TransactionCommitFailedException on invocation of this method.
*/
private void abortBlocking(final TransactionCommitFailedException originalCause)
throws TransactionCommitFailedException {
Exception cause = originalCause;
try {
- abortAsyncAll().get();
+ cohort.abort().get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
cause = new IllegalStateException("Abort failed.", e);
}
Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
}
-
- /**
- * Invokes abort on underlying cohorts and returns future which completes once all abort on cohorts are completed.
- *
- * @return Future which will complete once all cohorts completed abort.
- */
- @SuppressWarnings({"unchecked", "rawtypes"})
- private ListenableFuture<Void> abortAsyncAll() {
-
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
- int index = 0;
- for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[index++] = cohort.abort();
- }
-
- /*
- * We are returning all futures as list, not only succeeded ones in
- * order to fail composite future if any of them failed.
- * See Futures.allAsList for this description.
- */
- return (ListenableFuture) Futures.allAsList(ops);
- }
}