-
- /**
- *
- * Phase of 3PC commit
- *
- * Represents phase of 3PC Commit
- *
- *
- */
- private static enum CommitPhase {
- /**
- *
- * Commit Coordination Task is submitted for executing
- *
- */
- SUBMITTED,
- /**
- * Commit Coordination Task is in can commit phase of 3PC
- *
- */
- CAN_COMMIT,
- /**
- * Commit Coordination Task is in pre-commit phase of 3PC
- *
- */
- PRE_COMMIT,
- /**
- * Commit Coordination Task is in commit phase of 3PC
- *
- */
- COMMIT,
- /**
- * Commit Coordination Task is in abort phase of 3PC
- *
- */
- ABORT
- }
-
- /**
- *
- * Implementation of blocking three-phase commit-coordination tasks without
- * support of cancelation.
- *
- */
- private static class CommitCoordinationTask implements Callable<Void> {
-
- private final DOMDataWriteTransaction tx;
- private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
- private final DurationStatsTracker commitStatTracker;
-
- @GuardedBy("this")
- private CommitPhase currentPhase;
-
- public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
- final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
- final Optional<DOMDataCommitErrorListener> listener,
- final DurationStatsTracker commitStatTracker) {
- this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
- this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
- this.currentPhase = CommitPhase.SUBMITTED;
- this.commitStatTracker = commitStatTracker;
- }
-
- @Override
- public Void call() throws TransactionCommitFailedException {
-
- long startTime = System.nanoTime();
- try {
- canCommitBlocking();
- preCommitBlocking();
- commitBlocking();
- return null;
- } catch (TransactionCommitFailedException e) {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
- abortBlocking(e);
- throw e;
- } finally {
- if(commitStatTracker != null) {
- commitStatTracker.addDuration(System.nanoTime() - startTime);
- }
- }
- }
-
- /**
- *
- * Invokes canCommit on underlying cohorts and blocks till
- * all results are returned.
- *
- * Valid state transition is from SUBMITTED to CAN_COMMIT,
- * if currentPhase is not SUBMITTED throws IllegalStateException.
- *
- * @throws TransactionCommitFailedException
- * If one of cohorts failed can Commit
- *
- */
- private void canCommitBlocking() throws TransactionCommitFailedException {
- final Boolean canCommitResult = canCommitAll().checkedGet();
- if (!canCommitResult) {
- throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
- }
- }
-
- /**
- *
- * Invokes preCommit on underlying cohorts and blocks till
- * all results are returned.
- *
- * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
- * state is not CAN_COMMIT
- * throws IllegalStateException.
- *
- * @throws TransactionCommitFailedException
- * If one of cohorts failed preCommit
- *
- */
- private void preCommitBlocking() throws TransactionCommitFailedException {
- preCommitAll().checkedGet();
- }
-
- /**
- *
- * Invokes commit on underlying cohorts and blocks till
- * all results are returned.
- *
- * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
- * IllegalStateException.
- *
- * @throws TransactionCommitFailedException
- * If one of cohorts failed preCommit
- *
- */
- private void commitBlocking() throws TransactionCommitFailedException {
- commitAll().checkedGet();
- }
-
- /**
- * Aborts transaction.
- *
- * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
- * cohorts, blocks
- * for all results. If any of the abort failed throws
- * IllegalStateException,
- * which will contains originalCause as suppressed Exception.
- *
- * If aborts we're successful throws supplied exception
- *
- * @param originalCause
- * Exception which should be used to fail transaction for
- * consumers of transaction
- * future and listeners of transaction failure.
- * @throws TransactionCommitFailedException
- * on invocation of this method.
- * originalCa
- * @throws IllegalStateException
- * if abort failed.
- */
- private void abortBlocking(final TransactionCommitFailedException originalCause)
- throws TransactionCommitFailedException {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
- Exception cause = originalCause;
- try {
- abortAsyncAll().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
- cause = new IllegalStateException("Abort failed.", e);
- cause.addSuppressed(e);
- }
- Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
- }
-
- /**
- *
- * Invokes preCommit on underlying cohorts and returns future
- * which will complete once all preCommit on cohorts completed or
- * failed.
- *
- *
- * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
- * state is not CAN_COMMIT
- * throws IllegalStateException.
- *
- * @return Future which will complete once all cohorts completed
- * preCommit.
- * Future throws TransactionCommitFailedException
- * If any of cohorts failed preCommit
- *
- */
- private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
- changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops.add(cohort.preCommit());
- }
- /*
- * We are returing all futures as list, not only succeeded ones in
- * order to fail composite future if any of them failed.
- * See Futures.allAsList for this description.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return MappingCheckedFuture.create(compositeResult,
- TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
- }
-
- /**
- *
- * Invokes commit on underlying cohorts and returns future which
- * completes
- * once all commits on cohorts are completed.
- *
- * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
- * IllegalStateException
- *
- * @return Future which will complete once all cohorts completed
- * commit.
- * Future throws TransactionCommitFailedException
- * If any of cohorts failed preCommit
- *
- */
- private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
- changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops.add(cohort.commit());
- }
- /*
- * We are returing all futures as list, not only succeeded ones in
- * order to fail composite future if any of them failed.
- * See Futures.allAsList for this description.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return MappingCheckedFuture.create(compositeResult,
- TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
- }
-
- /**
- *
- * Invokes canCommit on underlying cohorts and returns composite future
- * which will contains {@link Boolean#TRUE} only and only if
- * all cohorts returned true.
- *
- * Valid state transition is from SUBMITTED to CAN_COMMIT,
- * if currentPhase is not SUBMITTED throws IllegalStateException.
- *
- * @return Future which will complete once all cohorts completed
- * preCommit.
- * Future throws TransactionCommitFailedException
- * If any of cohorts failed preCommit
- *
- */
- private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
- changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
- Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- canCommitOperations.add(cohort.canCommit());
- }
- ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
- ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
- return MappingCheckedFuture.create(allSuccessFuture,
- TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
-
- }
-
- /**
- *
- * Invokes abort on underlying cohorts and returns future which
- * completes
- * once all abort on cohorts are completed.
- *
- * @return Future which will complete once all cohorts completed
- * abort.
- *
- */
- private ListenableFuture<Void> abortAsyncAll() {
- changeStateFrom(currentPhase, CommitPhase.ABORT);
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops.add(cohort.abort());
- }
- /*
- * We are returing all futures as list, not only succeeded ones in
- * order to fail composite future if any of them failed.
- * See Futures.allAsList for this description.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return compositeResult;
- }
-
- /**
- * Change phase / state of transaction from expected value to new value
- *
- * This method checks state and updates state to new state of
- * of this task if current state equals expected state.
- * If expected state and current state are different raises
- * IllegalStateException
- * which means there is probably bug in implementation of commit
- * coordination.
- *
- * If transition is successful, it logs transition on DEBUG level.
- *
- * @param currentExpected
- * Required phase for change of state
- * @param newState
- * New Phase which will be entered by transaction.
- * @throws IllegalStateException
- * If currentState of task does not match expected state
- */
- private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
- Preconditions.checkState(currentPhase.equals(currentExpected),
- "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
- currentPhase, newState);
- LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
- currentPhase = newState;
- };
-
- }
-