*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
/**
*
* Implementation of blocking three phase commit coordinator, which which
public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
-
- /**
- * Runs AND binary operation between all booleans in supplied iteration of booleans.
- *
- * This method will stop evaluating iterables if first found is false.
- */
- private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
-
- @Override
- public Boolean apply(final Iterable<Boolean> input) {
- for(boolean value : input) {
- if(!value) {
- return Boolean.FALSE;
- }
- }
- return Boolean.TRUE;
- }
- };
-
+ private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
private final ListeningExecutorService executor;
/**
this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
}
+ public DurationStatisticsTracker getCommitStatsTracker() {
+ return commitStatsTracker;
+ }
+
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
- final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+ public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
- Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
- transaction, cohorts, listener));
- if (listener.isPresent()) {
- Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
+
+ ListenableFuture<Void> commitFuture = null;
+ try {
+ commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
+ commitStatsTracker));
+ } catch(RejectedExecutionException e) {
+ LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
+ executor, e);
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException(
+ "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
}
- return commitFuture;
+
+ return MappingCheckedFuture.create(commitFuture,
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
}
/**
- *
* Implementation of blocking three-phase commit-coordination tasks without
- * support of cancelation.
- *
+ * support of cancellation.
*/
- private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
-
+ private static final class CommitCoordinationTask implements Callable<Void> {
+ private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
-
- @GuardedBy("this")
- private CommitPhase currentPhase;
+ private final DurationStatisticsTracker commitStatTracker;
+ private final int cohortSize;
+ private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
- final Optional<DOMDataCommitErrorListener> listener) {
+ final DurationStatisticsTracker commitStatsTracker) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
- this.currentPhase = CommitPhase.SUBMITTED;
+ this.commitStatTracker = commitStatsTracker;
+ this.cohortSize = Iterables.size(cohorts);
}
@Override
- public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
+ public Void call() throws TransactionCommitFailedException {
+ final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
try {
canCommitBlocking();
preCommitBlocking();
- return commitBlocking();
+ commitBlocking();
+ return null;
} catch (TransactionCommitFailedException e) {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
- abortBlocking(e);
+ final CommitPhase phase = currentPhase;
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
+ abortBlocking(e, phase);
throw e;
+ } finally {
+ if (commitStatTracker != null) {
+ commitStatTracker.addDuration(System.nanoTime() - startTime);
+ }
}
}
*
*/
private void canCommitBlocking() throws TransactionCommitFailedException {
- final Boolean canCommitResult = canCommitAll().checkedGet();
- if (!canCommitResult) {
- throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+ for (ListenableFuture<?> canCommit : canCommitAll()) {
+ try {
+ final Boolean result = (Boolean)canCommit.get();
+ if (result == null || !result) {
+ throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
+ }
}
}
/**
*
- * Invokes preCommit on underlying cohorts and blocks till
- * all results are returned.
+ * Invokes canCommit on underlying cohorts and returns composite future
+ * which will contains {@link Boolean#TRUE} only and only if
+ * all cohorts returned true.
*
- * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
- * state is not CAN_COMMIT
- * throws IllegalStateException.
+ * Valid state transition is from SUBMITTED to CAN_COMMIT,
+ * if currentPhase is not SUBMITTED throws IllegalStateException.
*
- * @throws TransactionCommitFailedException
- * If one of cohorts failed preCommit
+ * @return List of all cohorts futures from can commit phase.
*
*/
- private void preCommitBlocking() throws TransactionCommitFailedException {
- preCommitAll().checkedGet();
+ private ListenableFuture<?>[] canCommitAll() {
+ changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
+
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+ int i = 0;
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[i++] = cohort.canCommit();
+ }
+ return ops;
}
/**
*
- * Invokes commit on underlying cohorts and blocks till
+ * Invokes preCommit on underlying cohorts and blocks till
* all results are returned.
*
- * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
- * IllegalStateException.
+ * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+ * state is not CAN_COMMIT
+ * throws IllegalStateException.
*
* @throws TransactionCommitFailedException
* If one of cohorts failed preCommit
*
*/
- private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
- commitAll().checkedGet();
- return RpcResultBuilder.<TransactionStatus>success(TransactionStatus.COMMITED).build();
- }
-
- /**
- * Aborts transaction.
- *
- * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
- * cohorts, blocks
- * for all results. If any of the abort failed throws
- * IllegalStateException,
- * which will contains originalCause as suppressed Exception.
- *
- * If aborts we're successful throws supplied exception
- *
- * @param originalCause
- * Exception which should be used to fail transaction for
- * consumers of transaction
- * future and listeners of transaction failure.
- * @throws TransactionCommitFailedException
- * on invocation of this method.
- * originalCa
- * @throws IllegalStateException
- * if abort failed.
- */
- private void abortBlocking(final TransactionCommitFailedException originalCause)
- throws TransactionCommitFailedException {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
- Exception cause = originalCause;
+ private void preCommitBlocking() throws TransactionCommitFailedException {
+ final ListenableFuture<?>[] preCommitFutures = preCommitAll();
try {
- abortAsyncAll().get();
+ for(ListenableFuture<?> future : preCommitFutures) {
+ future.get();
+ }
} catch (InterruptedException | ExecutionException e) {
- LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
- cause = new IllegalStateException("Abort failed.", e);
- cause.addSuppressed(e);
+ throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
}
- Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
}
/**
* state is not CAN_COMMIT
* throws IllegalStateException.
*
- * @return Future which will complete once all cohorts completed
- * preCommit.
- * Future throws TransactionCommitFailedException
- * If any of cohorts failed preCommit
+ * @return List of all cohorts futures from can commit phase.
*
*/
- private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
+ private ListenableFuture<?>[] preCommitAll() {
changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+ int i = 0;
for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops.add(cohort.preCommit());
+ ops[i++] = cohort.preCommit();
+ }
+ return ops;
+ }
+
+ /**
+ *
+ * Invokes commit on underlying cohorts and blocks till
+ * all results are returned.
+ *
+ * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+ * IllegalStateException.
+ *
+ * @throws TransactionCommitFailedException
+ * If one of cohorts failed preCommit
+ *
+ */
+ private void commitBlocking() throws TransactionCommitFailedException {
+ final ListenableFuture<?>[] commitFutures = commitAll();
+ try {
+ for(ListenableFuture<?> future : commitFutures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
}
- /*
- * We are returing all futures as list, not only succeeded ones in
- * order to fail composite future if any of them failed.
- * See Futures.allAsList for this description.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
}
/**
* Valid state transition is from PRE_COMMIT to COMMIT, if not throws
* IllegalStateException
*
- * @return Future which will complete once all cohorts completed
- * commit.
- * Future throws TransactionCommitFailedException
- * If any of cohorts failed preCommit
+ * @return List of all cohorts futures from can commit phase.
*
*/
- private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
+ private ListenableFuture<?>[] commitAll() {
changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+ int i = 0;
for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops.add(cohort.commit());
+ ops[i++] = cohort.commit();
}
- /*
- * We are returing all futures as list, not only succeeded ones in
- * order to fail composite future if any of them failed.
- * See Futures.allAsList for this description.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ return ops;
}
/**
+ * Aborts transaction.
*
- * Invokes canCommit on underlying cohorts and returns composite future
- * which will contains {@link Boolean#TRUE} only and only if
- * all cohorts returned true.
- *
- * Valid state transition is from SUBMITTED to CAN_COMMIT,
- * if currentPhase is not SUBMITTED throws IllegalStateException.
+ * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
+ * cohorts, blocks
+ * for all results. If any of the abort failed throws
+ * IllegalStateException,
+ * which will contains originalCause as suppressed Exception.
*
- * @return Future which will complete once all cohorts completed
- * preCommit.
- * Future throws TransactionCommitFailedException
- * If any of cohorts failed preCommit
+ * If aborts we're successful throws supplied exception
*
+ * @param originalCause
+ * Exception which should be used to fail transaction for
+ * consumers of transaction
+ * future and listeners of transaction failure.
+ * @param phase phase in which the problem ensued
+ * @throws TransactionCommitFailedException
+ * on invocation of this method.
+ * originalCa
+ * @throws IllegalStateException
+ * if abort failed.
*/
- private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
- changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
- Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- canCommitOperations.add(cohort.canCommit());
+ private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
+ throws TransactionCommitFailedException {
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
+ Exception cause = originalCause;
+ try {
+ abortAsyncAll(phase).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
+ cause = new IllegalStateException("Abort failed.", e);
+ cause.addSuppressed(e);
}
- ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
- ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
- return Futures
- .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
-
+ Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
}
/**
- *
* Invokes abort on underlying cohorts and returns future which
- * completes
- * once all abort on cohorts are completed.
+ * completes once all abort on cohorts are completed.
*
+ * @param phase phase in which the problem ensued
* @return Future which will complete once all cohorts completed
* abort.
- *
*/
- private ListenableFuture<Void> abortAsyncAll() {
- changeStateFrom(currentPhase, CommitPhase.ABORT);
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+ private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
+ changeStateFrom(phase, CommitPhase.ABORT);
+
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+ int i = 0;
for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops.add(cohort.abort());
+ ops[i++] = cohort.abort();
}
+
/*
- * We are returing all futures as list, not only succeeded ones in
+ * We are returning all futures as list, not only succeeded ones in
* order to fail composite future if any of them failed.
* See Futures.allAsList for this description.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
- ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+ ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
return compositeResult;
}
* @throws IllegalStateException
* If currentState of task does not match expected state
*/
- private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
- Preconditions.checkState(currentPhase.equals(currentExpected),
- "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
- currentPhase, newState);
- LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
- currentPhase = newState;
- };
+ private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
+ final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
+ Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
+ tx.getIdentifier(), currentExpected, currentPhase, newState);
+ LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
+ };
}
}