2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.dom.broker;
11 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
13 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
14 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
15 import com.google.common.base.Preconditions;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import java.util.Collection;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ExecutionException;
22 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * Implementation of blocking three-phase commit-coordination tasks without
28 * support of cancellation.
30 final class CommitCoordinationTask implements Callable<Void> {
31 private static enum Phase {
37 private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
38 private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
39 private final DurationStatisticsTracker commitStatTracker;
40 private final DOMDataTreeWriteTransaction tx;
42 public CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction,
43 final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
44 final DurationStatisticsTracker commitStatTracker) {
45 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
46 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
47 this.commitStatTracker = commitStatTracker;
51 public Void call() throws TransactionCommitFailedException {
52 final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
54 Phase phase = Phase.canCommit;
57 LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
60 phase = Phase.preCommit;
61 LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
64 phase = Phase.doCommit;
65 LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
68 LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
70 } catch (final TransactionCommitFailedException e) {
71 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
75 if (commitStatTracker != null) {
76 commitStatTracker.addDuration(System.nanoTime() - startTime);
83 * Invokes canCommit on underlying cohorts and blocks till
84 * all results are returned.
86 * Valid state transition is from SUBMITTED to CAN_COMMIT,
87 * if currentPhase is not SUBMITTED throws IllegalStateException.
89 * @throws TransactionCommitFailedException
90 * If one of cohorts failed can Commit
93 private void canCommitBlocking() throws TransactionCommitFailedException {
94 for (final ListenableFuture<?> canCommit : canCommitAll()) {
96 final Boolean result = (Boolean)canCommit.get();
97 if (result == null || !result) {
98 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
100 } catch (InterruptedException | ExecutionException e) {
101 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
108 * Invokes canCommit on underlying cohorts and returns composite future
109 * which will contains {@link Boolean#TRUE} only and only if
110 * all cohorts returned true.
112 * Valid state transition is from SUBMITTED to CAN_COMMIT,
113 * if currentPhase is not SUBMITTED throws IllegalStateException.
115 * @return List of all cohorts futures from can commit phase.
118 private ListenableFuture<?>[] canCommitAll() {
119 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
121 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
122 ops[i++] = cohort.canCommit();
129 * Invokes preCommit on underlying cohorts and blocks till
130 * all results are returned.
132 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
133 * state is not CAN_COMMIT
134 * throws IllegalStateException.
136 * @throws TransactionCommitFailedException
137 * If one of cohorts failed preCommit
140 private void preCommitBlocking() throws TransactionCommitFailedException {
141 final ListenableFuture<?>[] preCommitFutures = preCommitAll();
143 for(final ListenableFuture<?> future : preCommitFutures) {
146 } catch (InterruptedException | ExecutionException e) {
147 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
153 * Invokes preCommit on underlying cohorts and returns future
154 * which will complete once all preCommit on cohorts completed or
158 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
159 * state is not CAN_COMMIT
160 * throws IllegalStateException.
162 * @return List of all cohorts futures from can commit phase.
165 private ListenableFuture<?>[] preCommitAll() {
166 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
168 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
169 ops[i++] = cohort.preCommit();
176 * Invokes commit on underlying cohorts and blocks till
177 * all results are returned.
179 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
180 * IllegalStateException.
182 * @throws TransactionCommitFailedException
183 * If one of cohorts failed preCommit
186 private void commitBlocking() throws TransactionCommitFailedException {
187 final ListenableFuture<?>[] commitFutures = commitAll();
189 for(final ListenableFuture<?> future : commitFutures) {
192 } catch (InterruptedException | ExecutionException e) {
193 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
199 * Invokes commit on underlying cohorts and returns future which
201 * once all commits on cohorts are completed.
203 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
204 * IllegalStateException
206 * @return List of all cohorts futures from can commit phase.
208 private ListenableFuture<?>[] commitAll() {
209 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
211 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
212 ops[i++] = cohort.commit();
218 * Aborts transaction.
220 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
222 * for all results. If any of the abort failed throws
223 * IllegalStateException,
224 * which will contains originalCause as suppressed Exception.
226 * If aborts we're successful throws supplied exception
228 * @param originalCause
229 * Exception which should be used to fail transaction for
230 * consumers of transaction
231 * future and listeners of transaction failure.
232 * @param phase phase in which the problem ensued
233 * @throws TransactionCommitFailedException
234 * on invocation of this method.
236 * @throws IllegalStateException
239 private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
240 Exception cause = originalCause;
242 abortAsyncAll().get();
243 } catch (InterruptedException | ExecutionException e) {
244 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
245 cause = new IllegalStateException("Abort failed.", e);
246 cause.addSuppressed(e);
248 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
252 * Invokes abort on underlying cohorts and returns future which
253 * completes once all abort on cohorts are completed.
255 * @return Future which will complete once all cohorts completed
258 @SuppressWarnings({"unchecked", "rawtypes"})
259 private ListenableFuture<Void> abortAsyncAll() {
261 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
263 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
264 ops[i++] = cohort.abort();
268 * We are returning all futures as list, not only succeeded ones in
269 * order to fail composite future if any of them failed.
270 * See Futures.allAsList for this description.
272 return (ListenableFuture) Futures.allAsList(ops);