2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
9 import com.google.common.base.Preconditions;
10 import com.google.common.base.Throwables;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.Collection;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ExecutionException;
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
19 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
24 * Implementation of blocking three-phase commit-coordination tasks without
25 * support of cancellation.
27 final class CommitCoordinationTask implements Callable<Void> {
28 private static enum Phase {
34 private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
35 private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
36 private final DurationStatisticsTracker commitStatTracker;
37 private final DOMDataWriteTransaction tx;
39 public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
40 final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
41 final DurationStatisticsTracker commitStatTracker) {
42 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
43 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
44 this.commitStatTracker = commitStatTracker;
48 public Void call() throws TransactionCommitFailedException {
49 final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
51 Phase phase = Phase.canCommit;
54 LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
57 phase = Phase.preCommit;
58 LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
61 phase = Phase.doCommit;
62 LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
65 LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
67 } catch (TransactionCommitFailedException e) {
68 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
72 if (commitStatTracker != null) {
73 commitStatTracker.addDuration(System.nanoTime() - startTime);
80 * Invokes canCommit on underlying cohorts and blocks till
81 * all results are returned.
83 * Valid state transition is from SUBMITTED to CAN_COMMIT,
84 * if currentPhase is not SUBMITTED throws IllegalStateException.
86 * @throws TransactionCommitFailedException
87 * If one of cohorts failed can Commit
90 private void canCommitBlocking() throws TransactionCommitFailedException {
91 for (ListenableFuture<?> canCommit : canCommitAll()) {
93 final Boolean result = (Boolean)canCommit.get();
94 if (result == null || !result) {
95 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
97 } catch (InterruptedException | ExecutionException e) {
98 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
105 * Invokes canCommit on underlying cohorts and returns composite future
106 * which will contains {@link Boolean#TRUE} only and only if
107 * all cohorts returned true.
109 * Valid state transition is from SUBMITTED to CAN_COMMIT,
110 * if currentPhase is not SUBMITTED throws IllegalStateException.
112 * @return List of all cohorts futures from can commit phase.
115 private ListenableFuture<?>[] canCommitAll() {
116 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
118 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
119 ops[i++] = cohort.canCommit();
126 * Invokes preCommit on underlying cohorts and blocks till
127 * all results are returned.
129 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
130 * state is not CAN_COMMIT
131 * throws IllegalStateException.
133 * @throws TransactionCommitFailedException
134 * If one of cohorts failed preCommit
137 private void preCommitBlocking() throws TransactionCommitFailedException {
138 final ListenableFuture<?>[] preCommitFutures = preCommitAll();
140 for(ListenableFuture<?> future : preCommitFutures) {
143 } catch (InterruptedException | ExecutionException e) {
144 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
150 * Invokes preCommit on underlying cohorts and returns future
151 * which will complete once all preCommit on cohorts completed or
155 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
156 * state is not CAN_COMMIT
157 * throws IllegalStateException.
159 * @return List of all cohorts futures from can commit phase.
162 private ListenableFuture<?>[] preCommitAll() {
163 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
165 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
166 ops[i++] = cohort.preCommit();
173 * Invokes commit on underlying cohorts and blocks till
174 * all results are returned.
176 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
177 * IllegalStateException.
179 * @throws TransactionCommitFailedException
180 * If one of cohorts failed preCommit
183 private void commitBlocking() throws TransactionCommitFailedException {
184 final ListenableFuture<?>[] commitFutures = commitAll();
186 for(ListenableFuture<?> future : commitFutures) {
189 } catch (InterruptedException | ExecutionException e) {
190 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
196 * Invokes commit on underlying cohorts and returns future which
198 * once all commits on cohorts are completed.
200 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
201 * IllegalStateException
203 * @return List of all cohorts futures from can commit phase.
205 private ListenableFuture<?>[] commitAll() {
206 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
208 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
209 ops[i++] = cohort.commit();
215 * Aborts transaction.
217 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
219 * for all results. If any of the abort failed throws
220 * IllegalStateException,
221 * which will contains originalCause as suppressed Exception.
223 * If aborts we're successful throws supplied exception
225 * @param originalCause
226 * Exception which should be used to fail transaction for
227 * consumers of transaction
228 * future and listeners of transaction failure.
229 * @param phase phase in which the problem ensued
230 * @throws TransactionCommitFailedException
231 * on invocation of this method.
233 * @throws IllegalStateException
236 private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
237 Exception cause = originalCause;
239 abortAsyncAll().get();
240 } catch (InterruptedException | ExecutionException e) {
241 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
242 cause = new IllegalStateException("Abort failed.", e);
243 cause.addSuppressed(e);
245 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
249 * Invokes abort on underlying cohorts and returns future which
250 * completes once all abort on cohorts are completed.
252 * @return Future which will complete once all cohorts completed
255 private ListenableFuture<Void> abortAsyncAll() {
257 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
259 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
260 ops[i++] = cohort.abort();
264 * We are returning all futures as list, not only succeeded ones in
265 * order to fail composite future if any of them failed.
266 * See Futures.allAsList for this description.
268 @SuppressWarnings({ "unchecked", "rawtypes" })
269 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
270 return compositeResult;