2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.Throwables;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Collection;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import org.opendaylight.mdsal.common.api.CommitInfo;
20 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
22 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
23 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * Implementation of blocking three-phase commit-coordination tasks without support of cancellation.
30 sealed class CommitCoordinationTask implements Callable<CommitInfo> {
31 static final class WithTracker extends CommitCoordinationTask {
32 private final DurationStatisticsTracker commitStatTracker;
34 WithTracker(final DOMDataTreeWriteTransaction transaction,
35 final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
36 final DurationStatisticsTracker commitStatTracker) {
37 super(transaction, cohorts);
38 this.commitStatTracker = requireNonNull(commitStatTracker);
42 public CommitInfo call() throws TransactionCommitFailedException {
43 final long startTime = System.nanoTime();
48 commitStatTracker.addDuration(System.nanoTime() - startTime);
59 private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
60 private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
61 private final DOMDataTreeWriteTransaction tx;
63 CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction,
64 final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
65 tx = requireNonNull(transaction, "transaction must not be null");
66 this.cohorts = requireNonNull(cohorts, "cohorts must not be null");
70 public CommitInfo call() throws TransactionCommitFailedException {
71 var phase = Phase.CAN_COMMIT;
73 LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
76 phase = Phase.PRE_COMMIT;
77 LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
80 phase = Phase.DO_COMMIT;
81 LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
84 LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
85 return CommitInfo.empty();
86 } catch (final TransactionCommitFailedException e) {
87 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
94 * Invokes canCommit on underlying cohorts and blocks till all results are returned.
97 * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
98 * IllegalStateException.
100 * @throws TransactionCommitFailedException If one of cohorts failed can Commit
102 @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
103 private void canCommitBlocking() throws TransactionCommitFailedException {
104 for (final ListenableFuture<?> canCommit : canCommitAll()) {
106 final Boolean result = (Boolean)canCommit.get();
107 if (result == null || !result) {
108 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
110 } catch (InterruptedException | ExecutionException e) {
111 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
117 * Invokes canCommit on underlying cohorts and returns composite future which will contain {@link Boolean#TRUE} only
118 * and only if all cohorts returned true.
121 * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
122 * IllegalStateException.
124 * @return List of all cohorts futures from can commit phase.
126 private ListenableFuture<?>[] canCommitAll() {
127 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
129 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
130 ops[index++] = cohort.canCommit();
136 * Invokes preCommit on underlying cohorts and blocks until all results are returned.
139 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
140 * IllegalStateException.
142 * @throws TransactionCommitFailedException If one of cohorts failed preCommit
144 @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
145 private void preCommitBlocking() throws TransactionCommitFailedException {
146 final ListenableFuture<?>[] preCommitFutures = preCommitAll();
148 for (final ListenableFuture<?> future : preCommitFutures) {
151 } catch (InterruptedException | ExecutionException e) {
152 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
157 * Invokes preCommit on underlying cohorts and returns future which will complete once all preCommit on cohorts
158 * completed or failed.
161 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
162 * IllegalStateException.
164 * @return List of all cohorts futures from can commit phase.
166 private ListenableFuture<?>[] preCommitAll() {
167 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
169 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
170 ops[index++] = cohort.preCommit();
176 * Invokes commit on underlying cohorts and blocks until all results are returned.
179 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws IllegalStateException.
181 * @throws TransactionCommitFailedException If one of cohorts failed preCommit
183 @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
184 private void commitBlocking() throws TransactionCommitFailedException {
185 final ListenableFuture<?>[] commitFutures = commitAll();
187 for (final ListenableFuture<?> future : commitFutures) {
190 } catch (InterruptedException | ExecutionException e) {
191 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
196 * Invokes commit on underlying cohorts and returns future which
198 * once all commits on cohorts are completed.
201 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
202 * IllegalStateException
204 * @return List of all cohorts futures from can commit phase.
206 private ListenableFuture<?>[] commitAll() {
207 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
209 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
210 ops[index++] = cohort.commit();
216 * Aborts transaction.
219 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all cohorts, blocks for all results. If any
220 * of the abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
223 * If aborts we're successful throws supplied exception
225 * @param originalCause Exception which should be used to fail transaction for consumers of transaction future
226 * and listeners of transaction failure.
227 * @param phase phase in which the problem ensued
228 * @throws TransactionCommitFailedException on invocation of this method.
229 * @throws IllegalStateException if abort failed.
231 private void abortBlocking(final TransactionCommitFailedException originalCause)
232 throws TransactionCommitFailedException {
233 Exception cause = originalCause;
235 abortAsyncAll().get();
236 } catch (InterruptedException | ExecutionException e) {
237 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
238 cause = new IllegalStateException("Abort failed.", e);
239 cause.addSuppressed(e);
241 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
245 * Invokes abort on underlying cohorts and returns future which completes once all abort on cohorts are completed.
247 * @return Future which will complete once all cohorts completed abort.
249 @SuppressWarnings({"unchecked", "rawtypes"})
250 private ListenableFuture<Void> abortAsyncAll() {
252 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
254 for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
255 ops[index++] = cohort.abort();
259 * We are returning all futures as list, not only succeeded ones in
260 * order to fail composite future if any of them failed.
261 * See Futures.allAsList for this description.
263 return (ListenableFuture) Futures.allAsList(ops);