Specialize CommitCoordinationTask
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / CommitCoordinationTask.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Throwables;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Collection;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import org.opendaylight.mdsal.common.api.CommitInfo;
20 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
22 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
23 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * Implementation of blocking three-phase commit-coordination tasks without support of cancellation.
29  */
30 sealed class CommitCoordinationTask implements Callable<CommitInfo> {
31     static final class WithTracker extends CommitCoordinationTask {
32         private final DurationStatisticsTracker commitStatTracker;
33
34         WithTracker(final DOMDataTreeWriteTransaction transaction,
35                 final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
36                 final DurationStatisticsTracker commitStatTracker) {
37             super(transaction, cohorts);
38             this.commitStatTracker = requireNonNull(commitStatTracker);
39         }
40
41         @Override
42         public CommitInfo call() throws TransactionCommitFailedException {
43             final long startTime = System.nanoTime();
44
45             try {
46                 return super.call();
47             } finally {
48                 commitStatTracker.addDuration(System.nanoTime() - startTime);
49             }
50         }
51     }
52
53     private enum Phase {
54         CAN_COMMIT,
55         PRE_COMMIT,
56         DO_COMMIT
57     }
58
59     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
60     private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
61     private final DOMDataTreeWriteTransaction tx;
62
63     CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction,
64             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
65         tx = requireNonNull(transaction, "transaction must not be null");
66         this.cohorts = requireNonNull(cohorts, "cohorts must not be null");
67     }
68
69     @Override
70     public CommitInfo call() throws TransactionCommitFailedException {
71         var phase = Phase.CAN_COMMIT;
72         try {
73             LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
74             canCommitBlocking();
75
76             phase = Phase.PRE_COMMIT;
77             LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
78             preCommitBlocking();
79
80             phase = Phase.DO_COMMIT;
81             LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
82             commitBlocking();
83
84             LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
85             return CommitInfo.empty();
86         } catch (final TransactionCommitFailedException e) {
87             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
88             abortBlocking(e);
89             throw e;
90         }
91     }
92
93     /**
94      * Invokes canCommit on underlying cohorts and blocks till all results are returned.
95      *
96      * <p>
97      * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
98      * IllegalStateException.
99      *
100      * @throws TransactionCommitFailedException If one of cohorts failed can Commit
101      */
102     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
103     private void canCommitBlocking() throws TransactionCommitFailedException {
104         for (final ListenableFuture<?> canCommit : canCommitAll()) {
105             try {
106                 final Boolean result = (Boolean)canCommit.get();
107                 if (result == null || !result) {
108                     throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
109                 }
110             } catch (InterruptedException | ExecutionException e) {
111                 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
112             }
113         }
114     }
115
116     /**
117      * Invokes canCommit on underlying cohorts and returns composite future which will contain {@link Boolean#TRUE} only
118      * and only if all cohorts returned true.
119      *
120      * <p>
121      * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
122      * IllegalStateException.
123      *
124      * @return List of all cohorts futures from can commit phase.
125      */
126     private ListenableFuture<?>[] canCommitAll() {
127         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
128         int index = 0;
129         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
130             ops[index++] = cohort.canCommit();
131         }
132         return ops;
133     }
134
135     /**
136      * Invokes preCommit on underlying cohorts and blocks until all results are returned.
137      *
138      * <p>
139      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
140      * IllegalStateException.
141      *
142      * @throws TransactionCommitFailedException If one of cohorts failed preCommit
143      */
144     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
145     private void preCommitBlocking() throws TransactionCommitFailedException {
146         final ListenableFuture<?>[] preCommitFutures = preCommitAll();
147         try {
148             for (final ListenableFuture<?> future : preCommitFutures) {
149                 future.get();
150             }
151         } catch (InterruptedException | ExecutionException e) {
152             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
153         }
154     }
155
156     /**
157      * Invokes preCommit on underlying cohorts and returns future which will complete once all preCommit on cohorts
158      * completed or failed.
159      *
160      * <p>
161      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
162      * IllegalStateException.
163      *
164      * @return List of all cohorts futures from can commit phase.
165      */
166     private ListenableFuture<?>[] preCommitAll() {
167         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
168         int index = 0;
169         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
170             ops[index++] = cohort.preCommit();
171         }
172         return ops;
173     }
174
175     /**
176      * Invokes commit on underlying cohorts and blocks until all results are returned.
177      *
178      * <p>
179      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws IllegalStateException.
180      *
181      * @throws TransactionCommitFailedException If one of cohorts failed preCommit
182      */
183     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
184     private void commitBlocking() throws TransactionCommitFailedException {
185         final ListenableFuture<?>[] commitFutures = commitAll();
186         try {
187             for (final ListenableFuture<?> future : commitFutures) {
188                 future.get();
189             }
190         } catch (InterruptedException | ExecutionException e) {
191             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
192         }
193     }
194
195     /**
196      * Invokes commit on underlying cohorts and returns future which
197      * completes
198      * once all commits on cohorts are completed.
199      *
200      *<p>
201      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
202      * IllegalStateException
203      *
204      * @return List of all cohorts futures from can commit phase.
205      */
206     private ListenableFuture<?>[] commitAll() {
207         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
208         int index = 0;
209         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
210             ops[index++] = cohort.commit();
211         }
212         return ops;
213     }
214
215     /**
216      * Aborts transaction.
217      *
218      * <p>
219      * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all cohorts, blocks for all results. If any
220      * of the abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
221      *
222      * <p>
223      * If aborts we're successful throws supplied exception
224      *
225      * @param originalCause Exception which should be used to fail transaction for consumers of transaction future
226      *                      and listeners of transaction failure.
227      * @param phase phase in which the problem ensued
228      * @throws TransactionCommitFailedException on invocation of this method.
229      * @throws IllegalStateException if abort failed.
230      */
231     private void abortBlocking(final TransactionCommitFailedException originalCause)
232             throws TransactionCommitFailedException {
233         Exception cause = originalCause;
234         try {
235             abortAsyncAll().get();
236         } catch (InterruptedException | ExecutionException e) {
237             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
238             cause = new IllegalStateException("Abort failed.", e);
239             cause.addSuppressed(e);
240         }
241         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
242     }
243
244     /**
245      * Invokes abort on underlying cohorts and returns future which completes once all abort on cohorts are completed.
246      *
247      * @return Future which will complete once all cohorts completed abort.
248      */
249     @SuppressWarnings({"unchecked", "rawtypes"})
250     private ListenableFuture<Void> abortAsyncAll() {
251
252         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
253         int index = 0;
254         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
255             ops[index++] = cohort.abort();
256         }
257
258         /*
259          * We are returning all futures as list, not only succeeded ones in
260          * order to fail composite future if any of them failed.
261          * See Futures.allAsList for this description.
262          */
263         return (ListenableFuture) Futures.allAsList(ops);
264     }
265 }