X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataCommitCoordinatorImpl.java;fp=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataCommitCoordinatorImpl.java;h=540e2fe20ce52208ffd4a659b5e21c70eafe4b10;hb=2cf314976a3f81c7115f94c44fb37e967f8a4426;hp=0000000000000000000000000000000000000000;hpb=9d7a3392097e4c4a648327f77d7ca1a6aaf1b410;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java new file mode 100644 index 0000000000..540e2fe20c --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -0,0 +1,409 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +import javax.annotation.concurrent.GuardedBy; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; + +/** + * + * Implementation of blocking three phase commit coordinator, which which + * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}. + * + * This implementation does not support cancelation of commit, + * + * In order to advance to next phase of three phase commit all subtasks of + * previous step must be finish. + * + * This executor does not have an upper bound on subtask timeout. + * + * + */ +public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class); + + /** + * Runs AND binary operation between all booleans in supplied iteration of booleans. + * + * This method will stop evaluating iterables if first found is false. + */ + private static final Function, Boolean> AND_FUNCTION = new Function, Boolean>() { + + @Override + public Boolean apply(final Iterable input) { + for(boolean value : input) { + if(!value) { + return Boolean.FALSE; + } + } + return Boolean.TRUE; + } + }; + + private final ListeningExecutorService executor; + + /** + * + * Construct DOMDataCommitCoordinator which uses supplied executor to + * process commit coordinations. + * + * @param executor + */ + public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) { + this.executor = Preconditions.checkNotNull(executor, "executor must not be null."); + } + + @Override + public ListenableFuture> submit(final DOMDataWriteTransaction transaction, + final Iterable cohorts, final Optional listener) { + Preconditions.checkArgument(transaction != null, "Transaction must not be null."); + Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); + Preconditions.checkArgument(listener != null, "Listener must not be null"); + LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); + ListenableFuture> commitFuture = executor.submit(new CommitCoordinationTask( + transaction, cohorts, listener)); + if (listener.isPresent()) { + Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); + } + return commitFuture; + } + + /** + * + * Phase of 3PC commit + * + * Represents phase of 3PC Commit + * + * + */ + private static enum CommitPhase { + /** + * + * Commit Coordination Task is submitted for executing + * + */ + SUBMITTED, + /** + * Commit Coordination Task is in can commit phase of 3PC + * + */ + CAN_COMMIT, + /** + * Commit Coordination Task is in pre-commit phase of 3PC + * + */ + PRE_COMMIT, + /** + * Commit Coordination Task is in commit phase of 3PC + * + */ + COMMIT, + /** + * Commit Coordination Task is in abort phase of 3PC + * + */ + ABORT + } + + /** + * + * Implementation of blocking three-phase commit-coordination tasks without + * support of cancelation. + * + */ + private static class CommitCoordinationTask implements Callable> { + + private final DOMDataWriteTransaction tx; + private final Iterable cohorts; + + @GuardedBy("this") + private CommitPhase currentPhase; + + public CommitCoordinationTask(final DOMDataWriteTransaction transaction, + final Iterable cohorts, + final Optional listener) { + this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); + this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); + this.currentPhase = CommitPhase.SUBMITTED; + } + + @Override + public RpcResult call() throws TransactionCommitFailedException { + + try { + canCommitBlocking(); + preCommitBlocking(); + return commitBlocking(); + } catch (TransactionCommitFailedException e) { + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e); + abortBlocking(e); + throw e; + } + } + + /** + * + * Invokes canCommit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from SUBMITTED to CAN_COMMIT, + * if currentPhase is not SUBMITTED throws IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed can Commit + * + */ + private void canCommitBlocking() throws TransactionCommitFailedException { + final Boolean canCommitResult = canCommitAll().checkedGet(); + if (!canCommitResult) { + throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available."); + } + } + + /** + * + * Invokes preCommit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current + * state is not CAN_COMMIT + * throws IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed preCommit + * + */ + private void preCommitBlocking() throws TransactionCommitFailedException { + preCommitAll().checkedGet(); + } + + /** + * + * Invokes commit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from PRE_COMMIT to COMMIT, if not throws + * IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed preCommit + * + */ + private RpcResult commitBlocking() throws TransactionCommitFailedException { + commitAll().checkedGet(); + return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections. emptySet()); + } + + /** + * Aborts transaction. + * + * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all + * cohorts, blocks + * for all results. If any of the abort failed throws + * IllegalStateException, + * which will contains originalCause as suppressed Exception. + * + * If aborts we're successful throws supplied exception + * + * @param originalCause + * Exception which should be used to fail transaction for + * consumers of transaction + * future and listeners of transaction failure. + * @throws TransactionCommitFailedException + * on invocation of this method. + * originalCa + * @throws IllegalStateException + * if abort failed. + */ + private void abortBlocking(final TransactionCommitFailedException originalCause) + throws TransactionCommitFailedException { + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause); + Exception cause = originalCause; + try { + abortAsyncAll().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e); + cause = new IllegalStateException("Abort failed.", e); + cause.addSuppressed(e); + } + Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class); + } + + /** + * + * Invokes preCommit on underlying cohorts and returns future + * which will complete once all preCommit on cohorts completed or + * failed. + * + * + * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current + * state is not CAN_COMMIT + * throws IllegalStateException. + * + * @return Future which will complete once all cohorts completed + * preCommit. + * Future throws TransactionCommitFailedException + * If any of cohorts failed preCommit + * + */ + private CheckedFuture preCommitAll() { + changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT); + Builder> ops = ImmutableList.builder(); + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops.add(cohort.preCommit()); + } + /* + * We are returing all futures as list, not only succeeded ones in + * order to fail composite future if any of them failed. + * See Futures.allAsList for this description. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); + return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); + } + + /** + * + * Invokes commit on underlying cohorts and returns future which + * completes + * once all commits on cohorts are completed. + * + * Valid state transition is from PRE_COMMIT to COMMIT, if not throws + * IllegalStateException + * + * @return Future which will complete once all cohorts completed + * commit. + * Future throws TransactionCommitFailedException + * If any of cohorts failed preCommit + * + */ + private CheckedFuture commitAll() { + changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT); + Builder> ops = ImmutableList.builder(); + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops.add(cohort.commit()); + } + /* + * We are returing all futures as list, not only succeeded ones in + * order to fail composite future if any of them failed. + * See Futures.allAsList for this description. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); + return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + } + + /** + * + * Invokes canCommit on underlying cohorts and returns composite future + * which will contains {@link Boolean#TRUE} only and only if + * all cohorts returned true. + * + * Valid state transition is from SUBMITTED to CAN_COMMIT, + * if currentPhase is not SUBMITTED throws IllegalStateException. + * + * @return Future which will complete once all cohorts completed + * preCommit. + * Future throws TransactionCommitFailedException + * If any of cohorts failed preCommit + * + */ + private CheckedFuture canCommitAll() { + changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT); + Builder> canCommitOperations = ImmutableList.builder(); + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + canCommitOperations.add(cohort.canCommit()); + } + ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); + ListenableFuture allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION); + return Futures + .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); + + } + + /** + * + * Invokes abort on underlying cohorts and returns future which + * completes + * once all abort on cohorts are completed. + * + * @return Future which will complete once all cohorts completed + * abort. + * + */ + private ListenableFuture abortAsyncAll() { + changeStateFrom(currentPhase, CommitPhase.ABORT); + Builder> ops = ImmutableList.builder(); + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops.add(cohort.abort()); + } + /* + * We are returing all futures as list, not only succeeded ones in + * order to fail composite future if any of them failed. + * See Futures.allAsList for this description. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); + return compositeResult; + } + + /** + * Change phase / state of transaction from expected value to new value + * + * This method checks state and updates state to new state of + * of this task if current state equals expected state. + * If expected state and current state are different raises + * IllegalStateException + * which means there is probably bug in implementation of commit + * coordination. + * + * If transition is successful, it logs transition on DEBUG level. + * + * @param currentExpected + * Required phase for change of state + * @param newState + * New Phase which will be entered by transaction. + * @throws IllegalStateException + * If currentState of task does not match expected state + */ + private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { + Preconditions.checkState(currentPhase.equals(currentExpected), + "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(), + currentPhase, newState); + LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState); + currentPhase = newState; + }; + + } + +}