2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Throwables;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.Collection;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
19 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
26 * Implementation of blocking three-phase commit-coordination tasks without
27 * support of cancellation.
29 final class CommitCoordinationTask implements Callable<Void> {
36 private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
37 private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
38 private final DurationStatisticsTracker commitStatTracker;
39 private final DOMDataWriteTransaction tx;
41 CommitCoordinationTask(final DOMDataWriteTransaction transaction,
42 final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
43 final DurationStatisticsTracker commitStatTracker) {
44 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
45 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
46 this.commitStatTracker = commitStatTracker;
50 public Void call() throws TransactionCommitFailedException {
51 final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
53 Phase phase = Phase.canCommit;
56 LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
59 phase = Phase.preCommit;
60 LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
63 phase = Phase.doCommit;
64 LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
67 LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
69 } catch (final TransactionCommitFailedException e) {
70 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
74 if (commitStatTracker != null) {
75 commitStatTracker.addDuration(System.nanoTime() - startTime);
81 * Invokes canCommit on underlying cohorts and blocks till
82 * all results are returned.
85 * Valid state transition is from SUBMITTED to CAN_COMMIT,
86 * if currentPhase is not SUBMITTED throws IllegalStateException.
88 * @throws TransactionCommitFailedException
89 * If one of cohorts failed can Commit
92 private void canCommitBlocking() throws TransactionCommitFailedException {
93 for (final ListenableFuture<?> canCommit : canCommitAll()) {
95 final Boolean result = (Boolean)canCommit.get();
96 if (result == null || !result) {
97 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
99 } catch (InterruptedException | ExecutionException e) {
100 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
106 * Invokes canCommit on underlying cohorts and returns composite future
107 * which will contains {@link Boolean#TRUE} only and only if
108 * all cohorts returned true.
111 * Valid state transition is from SUBMITTED to CAN_COMMIT,
112 * if currentPhase is not SUBMITTED throws IllegalStateException.
114 * @return List of all cohorts futures from can commit phase.
117 private ListenableFuture<?>[] canCommitAll() {
118 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
120 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
121 ops[index++] = cohort.canCommit();
127 * Invokes preCommit on underlying cohorts and blocks till
128 * all results are returned.
131 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
132 * state is not CAN_COMMIT
133 * throws IllegalStateException.
135 * @throws TransactionCommitFailedException
136 * If one of cohorts failed preCommit
139 private void preCommitBlocking() throws TransactionCommitFailedException {
140 final ListenableFuture<?>[] preCommitFutures = preCommitAll();
142 for (final ListenableFuture<?> future : preCommitFutures) {
145 } catch (InterruptedException | ExecutionException e) {
146 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
151 * Invokes preCommit on underlying cohorts and returns future
152 * which will complete once all preCommit on cohorts completed or
156 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
157 * state is not CAN_COMMIT
158 * throws IllegalStateException.
160 * @return List of all cohorts futures from can commit phase.
163 private ListenableFuture<?>[] preCommitAll() {
164 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
166 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
167 ops[index++] = cohort.preCommit();
173 * Invokes commit on underlying cohorts and blocks till
174 * all results are returned.
177 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
178 * IllegalStateException.
180 * @throws TransactionCommitFailedException
181 * If one of cohorts failed preCommit
184 private void commitBlocking() throws TransactionCommitFailedException {
185 final ListenableFuture<?>[] commitFutures = commitAll();
187 for (final ListenableFuture<?> future : commitFutures) {
190 } catch (InterruptedException | ExecutionException e) {
191 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
196 * Invokes commit on underlying cohorts and returns future which
198 * once all commits on cohorts are completed.
201 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
202 * IllegalStateException
204 * @return List of all cohorts futures from can commit phase.
206 private ListenableFuture<?>[] commitAll() {
207 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
209 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
210 ops[index++] = cohort.commit();
216 * Aborts transaction.
219 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
221 * for all results. If any of the abort failed throws
222 * IllegalStateException,
223 * which will contains originalCause as suppressed Exception.
226 * If aborts we're successful throws supplied exception
228 * @param originalCause
229 * Exception which should be used to fail transaction for
230 * consumers of transaction
231 * future and listeners of transaction failure.
232 * @param phase phase in which the problem ensued
233 * @throws TransactionCommitFailedException
234 * on invocation of this method.
236 * @throws IllegalStateException
239 private void abortBlocking(
240 final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
241 Exception cause = originalCause;
243 abortAsyncAll().get();
244 } catch (InterruptedException | ExecutionException e) {
245 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
246 cause = new IllegalStateException("Abort failed.", e);
247 cause.addSuppressed(e);
249 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
253 * Invokes abort on underlying cohorts and returns future which
254 * completes once all abort on cohorts are completed.
256 * @return Future which will complete once all cohorts completed
259 @SuppressWarnings({"unchecked", "rawtypes"})
260 private ListenableFuture<Void> abortAsyncAll() {
262 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
264 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
265 ops[index++] = cohort.abort();
269 * We are returning all futures as list, not only succeeded ones in
270 * order to fail composite future if any of them failed.
271 * See Futures.allAsList for this description.
273 return (ListenableFuture) Futures.allAsList(ops);