2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Supplier;
13 import com.google.common.base.Throwables;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.Collection;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * Implementation of blocking three-phase commit-coordination tasks without
28 * support of cancellation.
31 final class CommitCoordinationTask<T> implements Callable<T> {
38 private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
39 private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
40 private final DurationStatisticsTracker commitStatTracker;
41 private final DOMDataWriteTransaction tx;
42 private final Supplier<T> futureValueSupplier;
44 CommitCoordinationTask(final DOMDataWriteTransaction transaction,
45 final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
46 final DurationStatisticsTracker commitStatTracker,
47 final Supplier<T> futureValueSupplier) {
48 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
49 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
50 this.commitStatTracker = commitStatTracker;
51 this.futureValueSupplier = futureValueSupplier;
55 public T call() throws TransactionCommitFailedException {
56 final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
58 Phase phase = Phase.canCommit;
61 LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
64 phase = Phase.preCommit;
65 LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
68 phase = Phase.doCommit;
69 LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
72 LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
73 return futureValueSupplier.get();
74 } catch (final TransactionCommitFailedException e) {
75 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
79 if (commitStatTracker != null) {
80 commitStatTracker.addDuration(System.nanoTime() - startTime);
86 * Invokes canCommit on underlying cohorts and blocks till
87 * all results are returned.
90 * Valid state transition is from SUBMITTED to CAN_COMMIT,
91 * if currentPhase is not SUBMITTED throws IllegalStateException.
93 * @throws TransactionCommitFailedException
94 * If one of cohorts failed can Commit
97 private void canCommitBlocking() throws TransactionCommitFailedException {
98 for (final ListenableFuture<?> canCommit : canCommitAll()) {
100 final Boolean result = (Boolean)canCommit.get();
101 if (result == null || !result) {
102 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
104 } catch (InterruptedException | ExecutionException e) {
105 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
111 * Invokes canCommit on underlying cohorts and returns composite future
112 * which will contains {@link Boolean#TRUE} only and only if
113 * all cohorts returned true.
116 * Valid state transition is from SUBMITTED to CAN_COMMIT,
117 * if currentPhase is not SUBMITTED throws IllegalStateException.
119 * @return List of all cohorts futures from can commit phase.
122 private ListenableFuture<?>[] canCommitAll() {
123 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
125 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
126 ops[index++] = cohort.canCommit();
132 * Invokes preCommit on underlying cohorts and blocks till
133 * all results are returned.
136 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
137 * state is not CAN_COMMIT
138 * throws IllegalStateException.
140 * @throws TransactionCommitFailedException
141 * If one of cohorts failed preCommit
144 private void preCommitBlocking() throws TransactionCommitFailedException {
145 final ListenableFuture<?>[] preCommitFutures = preCommitAll();
147 for (final ListenableFuture<?> future : preCommitFutures) {
150 } catch (InterruptedException | ExecutionException e) {
151 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
156 * Invokes preCommit on underlying cohorts and returns future
157 * which will complete once all preCommit on cohorts completed or
161 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
162 * state is not CAN_COMMIT
163 * throws IllegalStateException.
165 * @return List of all cohorts futures from can commit phase.
168 private ListenableFuture<?>[] preCommitAll() {
169 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
171 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
172 ops[index++] = cohort.preCommit();
178 * Invokes commit on underlying cohorts and blocks till
179 * all results are returned.
182 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
183 * IllegalStateException.
185 * @throws TransactionCommitFailedException
186 * If one of cohorts failed preCommit
189 private void commitBlocking() throws TransactionCommitFailedException {
190 final ListenableFuture<?>[] commitFutures = commitAll();
192 for (final ListenableFuture<?> future : commitFutures) {
195 } catch (InterruptedException | ExecutionException e) {
196 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
201 * Invokes commit on underlying cohorts and returns future which
203 * once all commits on cohorts are completed.
206 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
207 * IllegalStateException
209 * @return List of all cohorts futures from can commit phase.
211 private ListenableFuture<?>[] commitAll() {
212 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
214 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
215 ops[index++] = cohort.commit();
221 * Aborts transaction.
224 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
226 * for all results. If any of the abort failed throws
227 * IllegalStateException,
228 * which will contains originalCause as suppressed Exception.
231 * If aborts we're successful throws supplied exception
233 * @param originalCause
234 * Exception which should be used to fail transaction for
235 * consumers of transaction
236 * future and listeners of transaction failure.
237 * @param phase phase in which the problem ensued
238 * @throws TransactionCommitFailedException
239 * on invocation of this method.
241 * @throws IllegalStateException
244 private void abortBlocking(
245 final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
246 Exception cause = originalCause;
248 abortAsyncAll().get();
249 } catch (InterruptedException | ExecutionException e) {
250 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
251 cause = new IllegalStateException("Abort failed.", e);
252 cause.addSuppressed(e);
254 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
258 * Invokes abort on underlying cohorts and returns future which
259 * completes once all abort on cohorts are completed.
261 * @return Future which will complete once all cohorts completed
264 @SuppressWarnings({"unchecked", "rawtypes"})
265 private ListenableFuture<Void> abortAsyncAll() {
267 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
269 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
270 ops[index++] = cohort.abort();
274 * We are returning all futures as list, not only succeeded ones in
275 * order to fail composite future if any of them failed.
276 * See Futures.allAsList for this description.
278 return (ListenableFuture) Futures.allAsList(ops);