/*
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
+
package org.opendaylight.controller.md.sal.dom.broker.impl;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
* Implementation of blocking three-phase commit-coordination tasks without
* support of cancellation.
*/
-final class CommitCoordinationTask implements Callable<Void> {
- private static enum Phase {
+final class CommitCoordinationTask<T> implements Callable<T> {
+ private enum Phase {
canCommit,
preCommit,
doCommit,
- };
+ }
private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
private final DurationStatisticsTracker commitStatTracker;
private final DOMDataWriteTransaction tx;
+ private final Supplier<T> futureValueSupplier;
- public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
+ CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
- final DurationStatisticsTracker commitStatTracker) {
+ final DurationStatisticsTracker commitStatTracker,
+ final Supplier<T> futureValueSupplier) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
this.commitStatTracker = commitStatTracker;
+ this.futureValueSupplier = futureValueSupplier;
}
@Override
- public Void call() throws TransactionCommitFailedException {
+ public T call() throws TransactionCommitFailedException {
final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
Phase phase = Phase.canCommit;
commitBlocking();
LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
- return null;
- } catch (TransactionCommitFailedException e) {
+ return futureValueSupplier.get();
+ } catch (final TransactionCommitFailedException e) {
LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
abortBlocking(e);
throw e;
}
/**
- *
* Invokes canCommit on underlying cohorts and blocks till
* all results are returned.
*
+ * <p>
* Valid state transition is from SUBMITTED to CAN_COMMIT,
* if currentPhase is not SUBMITTED throws IllegalStateException.
*
*
*/
private void canCommitBlocking() throws TransactionCommitFailedException {
- for (ListenableFuture<?> canCommit : canCommitAll()) {
+ for (final ListenableFuture<?> canCommit : canCommitAll()) {
try {
final Boolean result = (Boolean)canCommit.get();
if (result == null || !result) {
}
/**
- *
* Invokes canCommit on underlying cohorts and returns composite future
* which will contains {@link Boolean#TRUE} only and only if
* all cohorts returned true.
*
+ * <p>
* Valid state transition is from SUBMITTED to CAN_COMMIT,
* if currentPhase is not SUBMITTED throws IllegalStateException.
*
*/
private ListenableFuture<?>[] canCommitAll() {
final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
- int i = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[i++] = cohort.canCommit();
+ int index = 0;
+ for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[index++] = cohort.canCommit();
}
return ops;
}
/**
- *
* Invokes preCommit on underlying cohorts and blocks till
* all results are returned.
*
+ * <p>
* Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
* state is not CAN_COMMIT
* throws IllegalStateException.
private void preCommitBlocking() throws TransactionCommitFailedException {
final ListenableFuture<?>[] preCommitFutures = preCommitAll();
try {
- for(ListenableFuture<?> future : preCommitFutures) {
+ for (final ListenableFuture<?> future : preCommitFutures) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
}
/**
- *
* Invokes preCommit on underlying cohorts and returns future
* which will complete once all preCommit on cohorts completed or
* failed.
*
- *
+ * <p>
* Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
* state is not CAN_COMMIT
* throws IllegalStateException.
*/
private ListenableFuture<?>[] preCommitAll() {
final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
- int i = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[i++] = cohort.preCommit();
+ int index = 0;
+ for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[index++] = cohort.preCommit();
}
return ops;
}
/**
- *
* Invokes commit on underlying cohorts and blocks till
* all results are returned.
*
+ * <p>
* Valid state transition is from PRE_COMMIT to COMMIT, if not throws
* IllegalStateException.
*
private void commitBlocking() throws TransactionCommitFailedException {
final ListenableFuture<?>[] commitFutures = commitAll();
try {
- for(ListenableFuture<?> future : commitFutures) {
+ for (final ListenableFuture<?> future : commitFutures) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
}
/**
- *
* Invokes commit on underlying cohorts and returns future which
* completes
* once all commits on cohorts are completed.
*
+ * <p>
* Valid state transition is from PRE_COMMIT to COMMIT, if not throws
* IllegalStateException
*
*/
private ListenableFuture<?>[] commitAll() {
final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
- int i = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[i++] = cohort.commit();
+ int index = 0;
+ for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[index++] = cohort.commit();
}
return ops;
}
/**
* Aborts transaction.
*
+ * <p>
* Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
* cohorts, blocks
* for all results. If any of the abort failed throws
* IllegalStateException,
* which will contains originalCause as suppressed Exception.
*
+ * <p>
* If aborts we're successful throws supplied exception
*
* @param originalCause
* @throws IllegalStateException
* if abort failed.
*/
- private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
+ private void abortBlocking(
+ final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
Exception cause = originalCause;
try {
abortAsyncAll().get();
* @return Future which will complete once all cohorts completed
* abort.
*/
+ @SuppressWarnings({"unchecked", "rawtypes"})
private ListenableFuture<Void> abortAsyncAll() {
final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
- int i = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[i++] = cohort.abort();
+ int index = 0;
+ for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[index++] = cohort.abort();
}
/*
* order to fail composite future if any of them failed.
* See Futures.allAsList for this description.
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
- ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
- return compositeResult;
+ return (ListenableFuture) Futures.allAsList(ops);
}
}