2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Throwables;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.Collection;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import org.opendaylight.mdsal.common.api.CommitInfo;
19 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
21 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * Implementation of blocking three-phase commit-coordination tasks without support of cancellation.
29 final class CommitCoordinationTask implements Callable<CommitInfo> {
36 private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
37 private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
38 private final DurationStatisticsTracker commitStatTracker;
39 private final DOMDataTreeWriteTransaction tx;
41 CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction,
42 final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
43 final DurationStatisticsTracker commitStatTracker) {
44 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
45 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
46 this.commitStatTracker = commitStatTracker;
50 public CommitInfo call() throws TransactionCommitFailedException {
51 final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
53 Phase phase = Phase.CAN_COMMIT;
56 LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
59 phase = Phase.PRE_COMMIT;
60 LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
63 phase = Phase.DO_COMMIT;
64 LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
67 LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
68 return CommitInfo.empty();
69 } catch (final TransactionCommitFailedException e) {
70 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
74 if (commitStatTracker != null) {
75 commitStatTracker.addDuration(System.nanoTime() - startTime);
81 * Invokes canCommit on underlying cohorts and blocks till all results are returned.
84 * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
85 * IllegalStateException.
87 * @throws TransactionCommitFailedException If one of cohorts failed can Commit
89 @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
90 private void canCommitBlocking() throws TransactionCommitFailedException {
91 for (final ListenableFuture<?> canCommit : canCommitAll()) {
93 final Boolean result = (Boolean)canCommit.get();
94 if (result == null || !result) {
95 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
97 } catch (InterruptedException | ExecutionException e) {
98 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
104 * Invokes canCommit on underlying cohorts and returns composite future which will contain {@link Boolean#TRUE} only
105 * and only if all cohorts returned true.
108 * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
109 * IllegalStateException.
111 * @return List of all cohorts futures from can commit phase.
113 private ListenableFuture<?>[] canCommitAll() {
114 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
116 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
117 ops[index++] = cohort.canCommit();
123 * Invokes preCommit on underlying cohorts and blocks until all results are returned.
126 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
127 * IllegalStateException.
129 * @throws TransactionCommitFailedException If one of cohorts failed preCommit
131 @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
132 private void preCommitBlocking() throws TransactionCommitFailedException {
133 final ListenableFuture<?>[] preCommitFutures = preCommitAll();
135 for (final ListenableFuture<?> future : preCommitFutures) {
138 } catch (InterruptedException | ExecutionException e) {
139 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
144 * Invokes preCommit on underlying cohorts and returns future which will complete once all preCommit on cohorts
145 * completed or failed.
148 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
149 * IllegalStateException.
151 * @return List of all cohorts futures from can commit phase.
153 private ListenableFuture<?>[] preCommitAll() {
154 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
156 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
157 ops[index++] = cohort.preCommit();
163 * Invokes commit on underlying cohorts and blocks until all results are returned.
166 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws IllegalStateException.
168 * @throws TransactionCommitFailedException If one of cohorts failed preCommit
170 @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
171 private void commitBlocking() throws TransactionCommitFailedException {
172 final ListenableFuture<?>[] commitFutures = commitAll();
174 for (final ListenableFuture<?> future : commitFutures) {
177 } catch (InterruptedException | ExecutionException e) {
178 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
183 * Invokes commit on underlying cohorts and returns future which
185 * once all commits on cohorts are completed.
188 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
189 * IllegalStateException
191 * @return List of all cohorts futures from can commit phase.
193 private ListenableFuture<?>[] commitAll() {
194 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
196 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
197 ops[index++] = cohort.commit();
203 * Aborts transaction.
206 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all cohorts, blocks for all results. If any
207 * of the abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
210 * If aborts we're successful throws supplied exception
212 * @param originalCause Exception which should be used to fail transaction for consumers of transaction future
213 * and listeners of transaction failure.
214 * @param phase phase in which the problem ensued
215 * @throws TransactionCommitFailedException on invocation of this method.
216 * @throws IllegalStateException if abort failed.
218 private void abortBlocking(final TransactionCommitFailedException originalCause)
219 throws TransactionCommitFailedException {
220 Exception cause = originalCause;
222 abortAsyncAll().get();
223 } catch (InterruptedException | ExecutionException e) {
224 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
225 cause = new IllegalStateException("Abort failed.", e);
226 cause.addSuppressed(e);
228 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
232 * Invokes abort on underlying cohorts and returns future which completes once all abort on cohorts are completed.
234 * @return Future which will complete once all cohorts completed abort.
236 @SuppressWarnings({"unchecked", "rawtypes"})
237 private ListenableFuture<Void> abortAsyncAll() {
239 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
241 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
242 ops[index++] = cohort.abort();
246 * We are returning all futures as list, not only succeeded ones in
247 * order to fail composite future if any of them failed.
248 * See Futures.allAsList for this description.
250 return (ListenableFuture) Futures.allAsList(ops);