Add AsyncWriteTransaction.commit()
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / CommitCoordinationTask.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Supplier;
13 import com.google.common.base.Throwables;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.Collection;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * Implementation of blocking three-phase commit-coordination tasks without
28  * support of cancellation.
29  */
30 final class CommitCoordinationTask<T> implements Callable<T> {
31     private enum Phase {
32         canCommit,
33         preCommit,
34         doCommit,
35     }
36
37     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
38     private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
39     private final DurationStatisticsTracker commitStatTracker;
40     private final DOMDataWriteTransaction tx;
41     private final Supplier<T> futureValueSupplier;
42
43     CommitCoordinationTask(final DOMDataWriteTransaction transaction,
44             final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
45             final DurationStatisticsTracker commitStatTracker,
46             final Supplier<T> futureValueSupplier) {
47         this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
48         this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
49         this.commitStatTracker = commitStatTracker;
50         this.futureValueSupplier = futureValueSupplier;
51     }
52
53     @Override
54     public T call() throws TransactionCommitFailedException {
55         final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
56
57         Phase phase = Phase.canCommit;
58
59         try {
60             LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
61             canCommitBlocking();
62
63             phase = Phase.preCommit;
64             LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
65             preCommitBlocking();
66
67             phase = Phase.doCommit;
68             LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
69             commitBlocking();
70
71             LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
72             return futureValueSupplier.get();
73         } catch (final TransactionCommitFailedException e) {
74             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
75             abortBlocking(e);
76             throw e;
77         } finally {
78             if (commitStatTracker != null) {
79                 commitStatTracker.addDuration(System.nanoTime() - startTime);
80             }
81         }
82     }
83
84     /**
85      * Invokes canCommit on underlying cohorts and blocks till
86      * all results are returned.
87      *
88      * <p>
89      * Valid state transition is from SUBMITTED to CAN_COMMIT,
90      * if currentPhase is not SUBMITTED throws IllegalStateException.
91      *
92      * @throws TransactionCommitFailedException
93      *             If one of cohorts failed can Commit
94      *
95      */
96     private void canCommitBlocking() throws TransactionCommitFailedException {
97         for (final ListenableFuture<?> canCommit : canCommitAll()) {
98             try {
99                 final Boolean result = (Boolean)canCommit.get();
100                 if (result == null || !result) {
101                     throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
102                 }
103             } catch (InterruptedException | ExecutionException e) {
104                 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
105             }
106         }
107     }
108
109     /**
110      * Invokes canCommit on underlying cohorts and returns composite future
111      * which will contains {@link Boolean#TRUE} only and only if
112      * all cohorts returned true.
113      *
114      * <p>
115      * Valid state transition is from SUBMITTED to CAN_COMMIT,
116      * if currentPhase is not SUBMITTED throws IllegalStateException.
117      *
118      * @return List of all cohorts futures from can commit phase.
119      *
120      */
121     private ListenableFuture<?>[] canCommitAll() {
122         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
123         int index = 0;
124         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
125             ops[index++] = cohort.canCommit();
126         }
127         return ops;
128     }
129
130     /**
131      * Invokes preCommit on underlying cohorts and blocks till
132      * all results are returned.
133      *
134      * <p>
135      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
136      * state is not CAN_COMMIT
137      * throws IllegalStateException.
138      *
139      * @throws TransactionCommitFailedException
140      *             If one of cohorts failed preCommit
141      *
142      */
143     private void preCommitBlocking() throws TransactionCommitFailedException {
144         final ListenableFuture<?>[] preCommitFutures = preCommitAll();
145         try {
146             for (final ListenableFuture<?> future : preCommitFutures) {
147                 future.get();
148             }
149         } catch (InterruptedException | ExecutionException e) {
150             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
151         }
152     }
153
154     /**
155      * Invokes preCommit on underlying cohorts and returns future
156      * which will complete once all preCommit on cohorts completed or
157      * failed.
158      *
159      * <p>
160      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
161      * state is not CAN_COMMIT
162      * throws IllegalStateException.
163      *
164      * @return List of all cohorts futures from can commit phase.
165      *
166      */
167     private ListenableFuture<?>[] preCommitAll() {
168         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
169         int index = 0;
170         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
171             ops[index++] = cohort.preCommit();
172         }
173         return ops;
174     }
175
176     /**
177      * Invokes commit on underlying cohorts and blocks till
178      * all results are returned.
179      *
180      * <p>
181      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
182      * IllegalStateException.
183      *
184      * @throws TransactionCommitFailedException
185      *             If one of cohorts failed preCommit
186      *
187      */
188     private void commitBlocking() throws TransactionCommitFailedException {
189         final ListenableFuture<?>[] commitFutures = commitAll();
190         try {
191             for (final ListenableFuture<?> future : commitFutures) {
192                 future.get();
193             }
194         } catch (InterruptedException | ExecutionException e) {
195             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
196         }
197     }
198
199     /**
200      * Invokes commit on underlying cohorts and returns future which
201      * completes
202      * once all commits on cohorts are completed.
203      *
204      * <p>
205      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
206      * IllegalStateException
207      *
208      * @return List of all cohorts futures from can commit phase.
209      */
210     private ListenableFuture<?>[] commitAll() {
211         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
212         int index = 0;
213         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
214             ops[index++] = cohort.commit();
215         }
216         return ops;
217     }
218
219     /**
220      * Aborts transaction.
221      *
222      * <p>
223      * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
224      * cohorts, blocks
225      * for all results. If any of the abort failed throws
226      * IllegalStateException,
227      * which will contains originalCause as suppressed Exception.
228      *
229      * <p>
230      * If aborts we're successful throws supplied exception
231      *
232      * @param originalCause
233      *            Exception which should be used to fail transaction for
234      *            consumers of transaction
235      *            future and listeners of transaction failure.
236      * @param phase phase in which the problem ensued
237      * @throws TransactionCommitFailedException
238      *             on invocation of this method.
239      *             originalCa
240      * @throws IllegalStateException
241      *             if abort failed.
242      */
243     private void abortBlocking(
244             final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
245         Exception cause = originalCause;
246         try {
247             abortAsyncAll().get();
248         } catch (InterruptedException | ExecutionException e) {
249             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
250             cause = new IllegalStateException("Abort failed.", e);
251             cause.addSuppressed(e);
252         }
253         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
254     }
255
256     /**
257      * Invokes abort on underlying cohorts and returns future which
258      * completes once all abort on cohorts are completed.
259      *
260      * @return Future which will complete once all cohorts completed
261      *         abort.
262      */
263     @SuppressWarnings({"unchecked", "rawtypes"})
264     private ListenableFuture<Void> abortAsyncAll() {
265
266         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
267         int index = 0;
268         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
269             ops[index++] = cohort.abort();
270         }
271
272         /*
273          * We are returning all futures as list, not only succeeded ones in
274          * order to fail composite future if any of them failed.
275          * See Futures.allAsList for this description.
276          */
277         return (ListenableFuture) Futures.allAsList(ops);
278     }
279 }