From f298b5a67b70daf3face69bf65483de544a6da61 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 2 Oct 2014 02:07:35 +0200 Subject: [PATCH] BUG-650: Split out CommitCoordinationTask The coordination task takes most of the coordinator implementation. We do not want to expose it, but at the same time we want to evolve task. Split it out and eliminate the unneeded atomic state updates, as it is only ever touched by a single thread. Change-Id: I49b494d23a8b8f9cdf6cb3dc33e08d22c5bf325e Signed-off-by: Robert Varga --- .../broker/impl/CommitCoordinationTask.java | 274 +++++++++++++++ .../impl/DOMDataCommitCoordinatorImpl.java | 320 ------------------ 2 files changed, 274 insertions(+), 320 deletions(-) create mode 100644 opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java new file mode 100644 index 0000000000..e0ac702dad --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java @@ -0,0 +1,274 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.util.DurationStatisticsTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of blocking three-phase commit-coordination tasks without + * support of cancellation. + */ +final class CommitCoordinationTask implements Callable { + private static enum Phase { + canCommit, + preCommit, + doCommit, + }; + + private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class); + private final Iterable cohorts; + private final DurationStatisticsTracker commitStatTracker; + private final DOMDataWriteTransaction tx; + private final int cohortSize; + + public CommitCoordinationTask(final DOMDataWriteTransaction transaction, + final Iterable cohorts, + final DurationStatisticsTracker commitStatTracker) { + this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); + this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); + this.commitStatTracker = commitStatTracker; + this.cohortSize = Iterables.size(cohorts); + } + + @Override + public Void call() throws TransactionCommitFailedException { + final long startTime = commitStatTracker != null ? System.nanoTime() : 0; + + Phase phase = Phase.canCommit; + + try { + LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier()); + canCommitBlocking(); + + phase = Phase.preCommit; + LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier()); + preCommitBlocking(); + + phase = Phase.doCommit; + LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier()); + commitBlocking(); + + LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier()); + return null; + } catch (TransactionCommitFailedException e) { + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e); + abortBlocking(e); + throw e; + } finally { + if (commitStatTracker != null) { + commitStatTracker.addDuration(System.nanoTime() - startTime); + } + } + } + + /** + * + * Invokes canCommit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from SUBMITTED to CAN_COMMIT, + * if currentPhase is not SUBMITTED throws IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed can Commit + * + */ + private void canCommitBlocking() throws TransactionCommitFailedException { + for (ListenableFuture canCommit : canCommitAll()) { + try { + final Boolean result = (Boolean)canCommit.get(); + if (result == null || !result) { + throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available."); + } + } catch (InterruptedException | ExecutionException e) { + throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e); + } + } + } + + /** + * + * Invokes canCommit on underlying cohorts and returns composite future + * which will contains {@link Boolean#TRUE} only and only if + * all cohorts returned true. + * + * Valid state transition is from SUBMITTED to CAN_COMMIT, + * if currentPhase is not SUBMITTED throws IllegalStateException. + * + * @return List of all cohorts futures from can commit phase. + * + */ + private ListenableFuture[] canCommitAll() { + final ListenableFuture[] ops = new ListenableFuture[cohortSize]; + int i = 0; + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[i++] = cohort.canCommit(); + } + return ops; + } + + /** + * + * Invokes preCommit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current + * state is not CAN_COMMIT + * throws IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed preCommit + * + */ + private void preCommitBlocking() throws TransactionCommitFailedException { + final ListenableFuture[] preCommitFutures = preCommitAll(); + try { + for(ListenableFuture future : preCommitFutures) { + future.get(); + } + } catch (InterruptedException | ExecutionException e) { + throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e); + } + } + + /** + * + * Invokes preCommit on underlying cohorts and returns future + * which will complete once all preCommit on cohorts completed or + * failed. + * + * + * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current + * state is not CAN_COMMIT + * throws IllegalStateException. + * + * @return List of all cohorts futures from can commit phase. + * + */ + private ListenableFuture[] preCommitAll() { + final ListenableFuture[] ops = new ListenableFuture[cohortSize]; + int i = 0; + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[i++] = cohort.preCommit(); + } + return ops; + } + + /** + * + * Invokes commit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from PRE_COMMIT to COMMIT, if not throws + * IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed preCommit + * + */ + private void commitBlocking() throws TransactionCommitFailedException { + final ListenableFuture[] commitFutures = commitAll(); + try { + for(ListenableFuture future : commitFutures) { + future.get(); + } + } catch (InterruptedException | ExecutionException e) { + throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e); + } + } + + /** + * + * Invokes commit on underlying cohorts and returns future which + * completes + * once all commits on cohorts are completed. + * + * Valid state transition is from PRE_COMMIT to COMMIT, if not throws + * IllegalStateException + * + * @return List of all cohorts futures from can commit phase. + */ + private ListenableFuture[] commitAll() { + final ListenableFuture[] ops = new ListenableFuture[cohortSize]; + int i = 0; + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[i++] = cohort.commit(); + } + return ops; + } + + /** + * Aborts transaction. + * + * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all + * cohorts, blocks + * for all results. If any of the abort failed throws + * IllegalStateException, + * which will contains originalCause as suppressed Exception. + * + * If aborts we're successful throws supplied exception + * + * @param originalCause + * Exception which should be used to fail transaction for + * consumers of transaction + * future and listeners of transaction failure. + * @param phase phase in which the problem ensued + * @throws TransactionCommitFailedException + * on invocation of this method. + * originalCa + * @throws IllegalStateException + * if abort failed. + */ + private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException { + Exception cause = originalCause; + try { + abortAsyncAll().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e); + cause = new IllegalStateException("Abort failed.", e); + cause.addSuppressed(e); + } + Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class); + } + + /** + * Invokes abort on underlying cohorts and returns future which + * completes once all abort on cohorts are completed. + * + * @return Future which will complete once all cohorts completed + * abort. + */ + private ListenableFuture abortAsyncAll() { + + final ListenableFuture[] ops = new ListenableFuture[cohortSize]; + int i = 0; + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[i++] = cohort.abort(); + } + + /* + * We are returning all futures as list, not only succeeded ones in + * order to fail composite future if any of them failed. + * See Futures.allAsList for this description. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops); + return compositeResult; + } +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index c1ecaa67df..7b53500231 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -7,16 +7,11 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -82,319 +77,4 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { return MappingCheckedFuture.create(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } - - /** - * - * Phase of 3PC commit - * - * Represents phase of 3PC Commit - * - * - */ - private static enum CommitPhase { - /** - * - * Commit Coordination Task is submitted for executing - * - */ - SUBMITTED, - /** - * Commit Coordination Task is in can commit phase of 3PC - * - */ - CAN_COMMIT, - /** - * Commit Coordination Task is in pre-commit phase of 3PC - * - */ - PRE_COMMIT, - /** - * Commit Coordination Task is in commit phase of 3PC - * - */ - COMMIT, - /** - * Commit Coordination Task is in abort phase of 3PC - * - */ - ABORT - } - - /** - * Implementation of blocking three-phase commit-coordination tasks without - * support of cancellation. - */ - private static final class CommitCoordinationTask implements Callable { - private static final AtomicReferenceFieldUpdater PHASE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase"); - private final DOMDataWriteTransaction tx; - private final Iterable cohorts; - private final DurationStatisticsTracker commitStatTracker; - private final int cohortSize; - private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED; - - public CommitCoordinationTask(final DOMDataWriteTransaction transaction, - final Iterable cohorts, - final DurationStatisticsTracker commitStatsTracker) { - this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); - this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); - this.commitStatTracker = commitStatsTracker; - this.cohortSize = Iterables.size(cohorts); - } - - @Override - public Void call() throws TransactionCommitFailedException { - final long startTime = commitStatTracker != null ? System.nanoTime() : 0; - - try { - canCommitBlocking(); - preCommitBlocking(); - commitBlocking(); - return null; - } catch (TransactionCommitFailedException e) { - final CommitPhase phase = currentPhase; - LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e); - abortBlocking(e, phase); - throw e; - } finally { - if (commitStatTracker != null) { - commitStatTracker.addDuration(System.nanoTime() - startTime); - } - } - } - - /** - * - * Invokes canCommit on underlying cohorts and blocks till - * all results are returned. - * - * Valid state transition is from SUBMITTED to CAN_COMMIT, - * if currentPhase is not SUBMITTED throws IllegalStateException. - * - * @throws TransactionCommitFailedException - * If one of cohorts failed can Commit - * - */ - private void canCommitBlocking() throws TransactionCommitFailedException { - for (ListenableFuture canCommit : canCommitAll()) { - try { - final Boolean result = (Boolean)canCommit.get(); - if (result == null || !result) { - throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available."); - } - } catch (InterruptedException | ExecutionException e) { - throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e); - } - } - } - - /** - * - * Invokes canCommit on underlying cohorts and returns composite future - * which will contains {@link Boolean#TRUE} only and only if - * all cohorts returned true. - * - * Valid state transition is from SUBMITTED to CAN_COMMIT, - * if currentPhase is not SUBMITTED throws IllegalStateException. - * - * @return List of all cohorts futures from can commit phase. - * - */ - private ListenableFuture[] canCommitAll() { - changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT); - - final ListenableFuture[] ops = new ListenableFuture[cohortSize]; - int i = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.canCommit(); - } - return ops; - } - - /** - * - * Invokes preCommit on underlying cohorts and blocks till - * all results are returned. - * - * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current - * state is not CAN_COMMIT - * throws IllegalStateException. - * - * @throws TransactionCommitFailedException - * If one of cohorts failed preCommit - * - */ - private void preCommitBlocking() throws TransactionCommitFailedException { - final ListenableFuture[] preCommitFutures = preCommitAll(); - try { - for(ListenableFuture future : preCommitFutures) { - future.get(); - } - } catch (InterruptedException | ExecutionException e) { - throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e); - } - } - - /** - * - * Invokes preCommit on underlying cohorts and returns future - * which will complete once all preCommit on cohorts completed or - * failed. - * - * - * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current - * state is not CAN_COMMIT - * throws IllegalStateException. - * - * @return List of all cohorts futures from can commit phase. - * - */ - private ListenableFuture[] preCommitAll() { - changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT); - - final ListenableFuture[] ops = new ListenableFuture[cohortSize]; - int i = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.preCommit(); - } - return ops; - } - - /** - * - * Invokes commit on underlying cohorts and blocks till - * all results are returned. - * - * Valid state transition is from PRE_COMMIT to COMMIT, if not throws - * IllegalStateException. - * - * @throws TransactionCommitFailedException - * If one of cohorts failed preCommit - * - */ - private void commitBlocking() throws TransactionCommitFailedException { - final ListenableFuture[] commitFutures = commitAll(); - try { - for(ListenableFuture future : commitFutures) { - future.get(); - } - } catch (InterruptedException | ExecutionException e) { - throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e); - } - } - - /** - * - * Invokes commit on underlying cohorts and returns future which - * completes - * once all commits on cohorts are completed. - * - * Valid state transition is from PRE_COMMIT to COMMIT, if not throws - * IllegalStateException - * - * @return List of all cohorts futures from can commit phase. - * - */ - private ListenableFuture[] commitAll() { - changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT); - - final ListenableFuture[] ops = new ListenableFuture[cohortSize]; - int i = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.commit(); - } - return ops; - } - - /** - * Aborts transaction. - * - * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all - * cohorts, blocks - * for all results. If any of the abort failed throws - * IllegalStateException, - * which will contains originalCause as suppressed Exception. - * - * If aborts we're successful throws supplied exception - * - * @param originalCause - * Exception which should be used to fail transaction for - * consumers of transaction - * future and listeners of transaction failure. - * @param phase phase in which the problem ensued - * @throws TransactionCommitFailedException - * on invocation of this method. - * originalCa - * @throws IllegalStateException - * if abort failed. - */ - private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase) - throws TransactionCommitFailedException { - LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause); - Exception cause = originalCause; - try { - abortAsyncAll(phase).get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e); - cause = new IllegalStateException("Abort failed.", e); - cause.addSuppressed(e); - } - Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class); - } - - /** - * Invokes abort on underlying cohorts and returns future which - * completes once all abort on cohorts are completed. - * - * @param phase phase in which the problem ensued - * @return Future which will complete once all cohorts completed - * abort. - */ - private ListenableFuture abortAsyncAll(final CommitPhase phase) { - changeStateFrom(phase, CommitPhase.ABORT); - - final ListenableFuture[] ops = new ListenableFuture[cohortSize]; - int i = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.abort(); - } - - /* - * We are returning all futures as list, not only succeeded ones in - * order to fail composite future if any of them failed. - * See Futures.allAsList for this description. - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops); - return compositeResult; - } - - /** - * Change phase / state of transaction from expected value to new value - * - * This method checks state and updates state to new state of - * of this task if current state equals expected state. - * If expected state and current state are different raises - * IllegalStateException - * which means there is probably bug in implementation of commit - * coordination. - * - * If transition is successful, it logs transition on DEBUG level. - * - * @param currentExpected - * Required phase for change of state - * @param newState - * New Phase which will be entered by transaction. - * @throws IllegalStateException - * If currentState of task does not match expected state - */ - private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { - final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState); - Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s", - tx.getIdentifier(), currentExpected, currentPhase, newState); - - LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState); - }; - } - } -- 2.36.6