X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataCommitCoordinatorImpl.java;h=15d7b1d966e1a59e028616bb85f5670f5805860b;hp=13a0093d340b7ef3bc496cf6950d1118ad309ab5;hb=976cf80f542309baf9d5e0ec82f4f6991f309f47;hpb=6500c7a216b78417e65cdf882d2b78de27736a97 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index 13a0093d34..15d7b1d966 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -6,32 +6,25 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import java.util.List; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; - -import javax.annotation.concurrent.GuardedBy; - -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.util.DurationStatsTracker; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableList.Builder; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; - /** * * Implementation of blocking three phase commit coordinator, which which @@ -49,25 +42,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class); - - /** - * Runs AND binary operation between all booleans in supplied iteration of booleans. - * - * This method will stop evaluating iterables if first found is false. - */ - private static final Function, Boolean> AND_FUNCTION = new Function, Boolean>() { - - @Override - public Boolean apply(final Iterable input) { - for(boolean value : input) { - if(!value) { - return Boolean.FALSE; - } - } - return Boolean.TRUE; - } - }; - + private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker(); private final ListeningExecutorService executor; /** @@ -81,19 +56,31 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { this.executor = Preconditions.checkNotNull(executor, "executor must not be null."); } + public DurationStatsTracker getCommitStatsTracker() { + return commitStatsTracker; + } + @Override - public ListenableFuture> submit(final DOMDataWriteTransaction transaction, - final Iterable cohorts, final Optional listener) { + public CheckedFuture submit(final DOMDataWriteTransaction transaction, + final Iterable cohorts) { Preconditions.checkArgument(transaction != null, "Transaction must not be null."); Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); - Preconditions.checkArgument(listener != null, "Listener must not be null"); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - ListenableFuture> commitFuture = executor.submit(new CommitCoordinationTask( - transaction, cohorts, listener)); - if (listener.isPresent()) { - Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); + + ListenableFuture commitFuture = null; + try { + commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, + commitStatsTracker)); + } catch(RejectedExecutionException e) { + LOG.error("The commit executor's queue is full - submit task was rejected. \n" + + executor, e); + return Futures.immediateFailedCheckedFuture( + new TransactionCommitFailedException( + "Could not submit the commit task - the commit queue capacity has been exceeded.", e)); } - return commitFuture; + + return MappingCheckedFuture.create(commitFuture, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -134,38 +121,45 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } /** - * * Implementation of blocking three-phase commit-coordination tasks without - * support of cancelation. - * + * support of cancellation. */ - private static class CommitCoordinationTask implements Callable> { - + private static final class CommitCoordinationTask implements Callable { + private static final AtomicReferenceFieldUpdater PHASE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase"); private final DOMDataWriteTransaction tx; private final Iterable cohorts; - - @GuardedBy("this") - private CommitPhase currentPhase; + private final DurationStatsTracker commitStatTracker; + private final int cohortSize; + private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED; public CommitCoordinationTask(final DOMDataWriteTransaction transaction, final Iterable cohorts, - final Optional listener) { + final DurationStatsTracker commitStatTracker) { this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); - this.currentPhase = CommitPhase.SUBMITTED; + this.commitStatTracker = commitStatTracker; + this.cohortSize = Iterables.size(cohorts); } @Override - public RpcResult call() throws TransactionCommitFailedException { + public Void call() throws TransactionCommitFailedException { + final long startTime = commitStatTracker != null ? System.nanoTime() : 0; try { canCommitBlocking(); preCommitBlocking(); - return commitBlocking(); + commitBlocking(); + return null; } catch (TransactionCommitFailedException e) { - LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e); - abortBlocking(e); + final CommitPhase phase = currentPhase; + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e); + abortBlocking(e, phase); throw e; + } finally { + if (commitStatTracker != null) { + commitStatTracker.addDuration(System.nanoTime() - startTime); + } } } @@ -182,79 +176,63 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * */ private void canCommitBlocking() throws TransactionCommitFailedException { - final Boolean canCommitResult = canCommitAll().checkedGet(); - if (!canCommitResult) { - throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available."); + for (ListenableFuture canCommit : canCommitAll()) { + try { + final Boolean result = (Boolean)canCommit.get(); + if (result == null || !result) { + throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available."); + } + } catch (InterruptedException | ExecutionException e) { + throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e); + } } } /** * - * Invokes preCommit on underlying cohorts and blocks till - * all results are returned. + * Invokes canCommit on underlying cohorts and returns composite future + * which will contains {@link Boolean#TRUE} only and only if + * all cohorts returned true. * - * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current - * state is not CAN_COMMIT - * throws IllegalStateException. + * Valid state transition is from SUBMITTED to CAN_COMMIT, + * if currentPhase is not SUBMITTED throws IllegalStateException. * - * @throws TransactionCommitFailedException - * If one of cohorts failed preCommit + * @return List of all cohorts futures from can commit phase. * */ - private void preCommitBlocking() throws TransactionCommitFailedException { - preCommitAll().checkedGet(); + private ListenableFuture[] canCommitAll() { + changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT); + + final ListenableFuture[] ops = new ListenableFuture[cohortSize]; + int i = 0; + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[i++] = cohort.canCommit(); + } + return ops; } /** * - * Invokes commit on underlying cohorts and blocks till + * Invokes preCommit on underlying cohorts and blocks till * all results are returned. * - * Valid state transition is from PRE_COMMIT to COMMIT, if not throws - * IllegalStateException. + * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current + * state is not CAN_COMMIT + * throws IllegalStateException. * * @throws TransactionCommitFailedException * If one of cohorts failed preCommit * */ - private RpcResult commitBlocking() throws TransactionCommitFailedException { - commitAll().checkedGet(); - return RpcResultBuilder.success(TransactionStatus.COMMITED).build(); - } - - /** - * Aborts transaction. - * - * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all - * cohorts, blocks - * for all results. If any of the abort failed throws - * IllegalStateException, - * which will contains originalCause as suppressed Exception. - * - * If aborts we're successful throws supplied exception - * - * @param originalCause - * Exception which should be used to fail transaction for - * consumers of transaction - * future and listeners of transaction failure. - * @throws TransactionCommitFailedException - * on invocation of this method. - * originalCa - * @throws IllegalStateException - * if abort failed. - */ - private void abortBlocking(final TransactionCommitFailedException originalCause) - throws TransactionCommitFailedException { - LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause); - Exception cause = originalCause; + private void preCommitBlocking() throws TransactionCommitFailedException { + final ListenableFuture[] preCommitFutures = preCommitAll(); try { - abortAsyncAll().get(); + for(ListenableFuture future : preCommitFutures) { + future.get(); + } } catch (InterruptedException | ExecutionException e) { - LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e); - cause = new IllegalStateException("Abort failed.", e); - cause.addSuppressed(e); + throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e); } - Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class); } /** @@ -268,26 +246,41 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * state is not CAN_COMMIT * throws IllegalStateException. * - * @return Future which will complete once all cohorts completed - * preCommit. - * Future throws TransactionCommitFailedException - * If any of cohorts failed preCommit + * @return List of all cohorts futures from can commit phase. * */ - private CheckedFuture preCommitAll() { + private ListenableFuture[] preCommitAll() { changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT); - Builder> ops = ImmutableList.builder(); + + final ListenableFuture[] ops = new ListenableFuture[cohortSize]; + int i = 0; for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops.add(cohort.preCommit()); + ops[i++] = cohort.preCommit(); + } + return ops; + } + + /** + * + * Invokes commit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from PRE_COMMIT to COMMIT, if not throws + * IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed preCommit + * + */ + private void commitBlocking() throws TransactionCommitFailedException { + final ListenableFuture[] commitFutures = commitAll(); + try { + for(ListenableFuture future : commitFutures) { + future.get(); + } + } catch (InterruptedException | ExecutionException e) { + throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e); } - /* - * We are returing all futures as list, not only succeeded ones in - * order to fail composite future if any of them failed. - * See Futures.allAsList for this description. - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); } /** @@ -299,79 +292,80 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * Valid state transition is from PRE_COMMIT to COMMIT, if not throws * IllegalStateException * - * @return Future which will complete once all cohorts completed - * commit. - * Future throws TransactionCommitFailedException - * If any of cohorts failed preCommit + * @return List of all cohorts futures from can commit phase. * */ - private CheckedFuture commitAll() { + private ListenableFuture[] commitAll() { changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT); - Builder> ops = ImmutableList.builder(); + + final ListenableFuture[] ops = new ListenableFuture[cohortSize]; + int i = 0; for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops.add(cohort.commit()); + ops[i++] = cohort.commit(); } - /* - * We are returing all futures as list, not only succeeded ones in - * order to fail composite future if any of them failed. - * See Futures.allAsList for this description. - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + return ops; } /** + * Aborts transaction. * - * Invokes canCommit on underlying cohorts and returns composite future - * which will contains {@link Boolean#TRUE} only and only if - * all cohorts returned true. - * - * Valid state transition is from SUBMITTED to CAN_COMMIT, - * if currentPhase is not SUBMITTED throws IllegalStateException. + * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all + * cohorts, blocks + * for all results. If any of the abort failed throws + * IllegalStateException, + * which will contains originalCause as suppressed Exception. * - * @return Future which will complete once all cohorts completed - * preCommit. - * Future throws TransactionCommitFailedException - * If any of cohorts failed preCommit + * If aborts we're successful throws supplied exception * + * @param originalCause + * Exception which should be used to fail transaction for + * consumers of transaction + * future and listeners of transaction failure. + * @param phase phase in which the problem ensued + * @throws TransactionCommitFailedException + * on invocation of this method. + * originalCa + * @throws IllegalStateException + * if abort failed. */ - private CheckedFuture canCommitAll() { - changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT); - Builder> canCommitOperations = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - canCommitOperations.add(cohort.canCommit()); + private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase) + throws TransactionCommitFailedException { + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause); + Exception cause = originalCause; + try { + abortAsyncAll(phase).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e); + cause = new IllegalStateException("Abort failed.", e); + cause.addSuppressed(e); } - ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); - ListenableFuture allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION); - return Futures - .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); - + Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class); } /** - * * Invokes abort on underlying cohorts and returns future which - * completes - * once all abort on cohorts are completed. + * completes once all abort on cohorts are completed. * + * @param phase phase in which the problem ensued * @return Future which will complete once all cohorts completed * abort. - * */ - private ListenableFuture abortAsyncAll() { - changeStateFrom(currentPhase, CommitPhase.ABORT); - Builder> ops = ImmutableList.builder(); + private ListenableFuture abortAsyncAll(final CommitPhase phase) { + changeStateFrom(phase, CommitPhase.ABORT); + + final ListenableFuture[] ops = new ListenableFuture[cohortSize]; + int i = 0; for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops.add(cohort.abort()); + ops[i++] = cohort.abort(); } + /* - * We are returing all futures as list, not only succeeded ones in + * We are returning all futures as list, not only succeeded ones in * order to fail composite future if any of them failed. * See Futures.allAsList for this description. */ @SuppressWarnings({ "unchecked", "rawtypes" }) - ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); + ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops); return compositeResult; } @@ -394,14 +388,13 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * @throws IllegalStateException * If currentState of task does not match expected state */ - private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { - Preconditions.checkState(currentPhase.equals(currentExpected), - "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(), - currentPhase, newState); - LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState); - currentPhase = newState; - }; + private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { + final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState); + Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s", + tx.getIdentifier(), currentExpected, currentPhase, newState); + LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState); + }; } }