d243a4da1ec48506c53d2d6861e649a1677bfa52
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / CommitCoordinationTask.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.dom.broker;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Throwables;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.Collection;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import org.opendaylight.mdsal.common.api.CommitInfo;
19 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
21 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * Implementation of blocking three-phase commit-coordination tasks without
28  * support of cancellation.
29  */
30 final class CommitCoordinationTask implements Callable<CommitInfo> {
31     private enum Phase {
32         CAN_COMMIT,
33         PRE_COMMIT,
34         DO_COMMIT
35     }
36
37     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
38     private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
39     private final DurationStatisticsTracker commitStatTracker;
40     private final DOMDataTreeWriteTransaction tx;
41
42     CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction,
43             final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
44             final DurationStatisticsTracker commitStatTracker) {
45         this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
46         this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
47         this.commitStatTracker = commitStatTracker;
48     }
49
50     @Override
51     public CommitInfo call() throws TransactionCommitFailedException {
52         final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
53
54         Phase phase = Phase.CAN_COMMIT;
55
56         try {
57             LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
58             canCommitBlocking();
59
60             phase = Phase.PRE_COMMIT;
61             LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
62             preCommitBlocking();
63
64             phase = Phase.DO_COMMIT;
65             LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
66             commitBlocking();
67
68             LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
69             return CommitInfo.empty();
70         } catch (final TransactionCommitFailedException e) {
71             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
72             abortBlocking(e);
73             throw e;
74         } finally {
75             if (commitStatTracker != null) {
76                 commitStatTracker.addDuration(System.nanoTime() - startTime);
77             }
78         }
79     }
80
81     /**
82      * Invokes canCommit on underlying cohorts and blocks till
83      * all results are returned.
84      *
85      *<p>
86      * Valid state transition is from SUBMITTED to CAN_COMMIT,
87      * if currentPhase is not SUBMITTED throws IllegalStateException.
88      *
89      * @throws TransactionCommitFailedException
90      *             If one of cohorts failed can Commit
91      *
92      */
93     private void canCommitBlocking() throws TransactionCommitFailedException {
94         for (final ListenableFuture<?> canCommit : canCommitAll()) {
95             try {
96                 final Boolean result = (Boolean)canCommit.get();
97                 if (result == null || !result) {
98                     throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
99                 }
100             } catch (InterruptedException | ExecutionException e) {
101                 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
102             }
103         }
104     }
105
106     /**
107      * Invokes canCommit on underlying cohorts and returns composite future
108      * which will contains {@link Boolean#TRUE} only and only if
109      * all cohorts returned true.
110      *
111      *<p>
112      * Valid state transition is from SUBMITTED to CAN_COMMIT,
113      * if currentPhase is not SUBMITTED throws IllegalStateException.
114      *
115      * @return List of all cohorts futures from can commit phase.
116      *
117      */
118     private ListenableFuture<?>[] canCommitAll() {
119         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
120         int index = 0;
121         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
122             ops[index++] = cohort.canCommit();
123         }
124         return ops;
125     }
126
127     /**
128      * Invokes preCommit on underlying cohorts and blocks till
129      * all results are returned.
130      *
131      *<p>
132      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
133      * state is not CAN_COMMIT
134      * throws IllegalStateException.
135      *
136      * @throws TransactionCommitFailedException
137      *             If one of cohorts failed preCommit
138      *
139      */
140     private void preCommitBlocking() throws TransactionCommitFailedException {
141         final ListenableFuture<?>[] preCommitFutures = preCommitAll();
142         try {
143             for (final ListenableFuture<?> future : preCommitFutures) {
144                 future.get();
145             }
146         } catch (InterruptedException | ExecutionException e) {
147             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
148         }
149     }
150
151     /**
152      * Invokes preCommit on underlying cohorts and returns future
153      * which will complete once all preCommit on cohorts completed or
154      * failed.
155      *
156      *<p>
157      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
158      * state is not CAN_COMMIT
159      * throws IllegalStateException.
160      *
161      * @return List of all cohorts futures from can commit phase.
162      *
163      */
164     private ListenableFuture<?>[] preCommitAll() {
165         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
166         int index = 0;
167         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
168             ops[index++] = cohort.preCommit();
169         }
170         return ops;
171     }
172
173     /**
174      * Invokes commit on underlying cohorts and blocks till
175      * all results are returned.
176      *
177      *<p>
178      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
179      * IllegalStateException.
180      *
181      * @throws TransactionCommitFailedException
182      *             If one of cohorts failed preCommit
183      *
184      */
185     private void commitBlocking() throws TransactionCommitFailedException {
186         final ListenableFuture<?>[] commitFutures = commitAll();
187         try {
188             for (final ListenableFuture<?> future : commitFutures) {
189                 future.get();
190             }
191         } catch (InterruptedException | ExecutionException e) {
192             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
193         }
194     }
195
196     /**
197      * Invokes commit on underlying cohorts and returns future which
198      * completes
199      * once all commits on cohorts are completed.
200      *
201      *<p>
202      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
203      * IllegalStateException
204      *
205      * @return List of all cohorts futures from can commit phase.
206      */
207     private ListenableFuture<?>[] commitAll() {
208         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
209         int index = 0;
210         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
211             ops[index++] = cohort.commit();
212         }
213         return ops;
214     }
215
216     /**
217      * Aborts transaction.
218      *
219      *<p>
220      * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
221      * cohorts, blocks
222      * for all results. If any of the abort failed throws
223      * IllegalStateException,
224      * which will contains originalCause as suppressed Exception.
225      *
226      *<p>
227      * If aborts we're successful throws supplied exception
228      *
229      * @param originalCause
230      *            Exception which should be used to fail transaction for
231      *            consumers of transaction
232      *            future and listeners of transaction failure.
233      * @param phase phase in which the problem ensued
234      * @throws TransactionCommitFailedException
235      *             on invocation of this method.
236      *             originalCa
237      * @throws IllegalStateException
238      *             if abort failed.
239      */
240     private void abortBlocking(final TransactionCommitFailedException originalCause)
241             throws TransactionCommitFailedException {
242         Exception cause = originalCause;
243         try {
244             abortAsyncAll().get();
245         } catch (InterruptedException | ExecutionException e) {
246             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
247             cause = new IllegalStateException("Abort failed.", e);
248             cause.addSuppressed(e);
249         }
250         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
251     }
252
253     /**
254      * Invokes abort on underlying cohorts and returns future which
255      * completes once all abort on cohorts are completed.
256      *
257      * @return Future which will complete once all cohorts completed
258      *         abort.
259      */
260     @SuppressWarnings({"unchecked", "rawtypes"})
261     private ListenableFuture<Void> abortAsyncAll() {
262
263         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
264         int index = 0;
265         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
266             ops[index++] = cohort.abort();
267         }
268
269         /*
270          * We are returning all futures as list, not only succeeded ones in
271          * order to fail composite future if any of them failed.
272          * See Futures.allAsList for this description.
273          */
274         return (ListenableFuture) Futures.allAsList(ops);
275     }
276 }