fec0a7687fb01cc94a9f2c519a59aaaa994bbede
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / CommitCoordinationTask.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Supplier;
13 import com.google.common.base.Throwables;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.Collection;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * Implementation of blocking three-phase commit-coordination tasks without
28  * support of cancellation.
29  */
30 @Deprecated
31 final class CommitCoordinationTask<T> implements Callable<T> {
32     private enum Phase {
33         canCommit,
34         preCommit,
35         doCommit,
36     }
37
38     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
39     private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
40     private final DurationStatisticsTracker commitStatTracker;
41     private final DOMDataWriteTransaction tx;
42     private final Supplier<T> futureValueSupplier;
43
44     CommitCoordinationTask(final DOMDataWriteTransaction transaction,
45             final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
46             final DurationStatisticsTracker commitStatTracker,
47             final Supplier<T> futureValueSupplier) {
48         this.tx = requireNonNull(transaction, "transaction must not be null");
49         this.cohorts = requireNonNull(cohorts, "cohorts must not be null");
50         this.commitStatTracker = commitStatTracker;
51         this.futureValueSupplier = futureValueSupplier;
52     }
53
54     @Override
55     public T call() throws TransactionCommitFailedException {
56         final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
57
58         Phase phase = Phase.canCommit;
59
60         try {
61             LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
62             canCommitBlocking();
63
64             phase = Phase.preCommit;
65             LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
66             preCommitBlocking();
67
68             phase = Phase.doCommit;
69             LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
70             commitBlocking();
71
72             LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
73             return futureValueSupplier.get();
74         } catch (final TransactionCommitFailedException e) {
75             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
76             abortBlocking(e);
77             throw e;
78         } finally {
79             if (commitStatTracker != null) {
80                 commitStatTracker.addDuration(System.nanoTime() - startTime);
81             }
82         }
83     }
84
85     /**
86      * Invokes canCommit on underlying cohorts and blocks till
87      * all results are returned.
88      *
89      * <p>
90      * Valid state transition is from SUBMITTED to CAN_COMMIT,
91      * if currentPhase is not SUBMITTED throws IllegalStateException.
92      *
93      * @throws TransactionCommitFailedException
94      *             If one of cohorts failed can Commit
95      *
96      */
97     private void canCommitBlocking() throws TransactionCommitFailedException {
98         for (final ListenableFuture<?> canCommit : canCommitAll()) {
99             try {
100                 final Boolean result = (Boolean)canCommit.get();
101                 if (result == null || !result) {
102                     throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
103                 }
104             } catch (InterruptedException | ExecutionException e) {
105                 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
106             }
107         }
108     }
109
110     /**
111      * Invokes canCommit on underlying cohorts and returns composite future
112      * which will contains {@link Boolean#TRUE} only and only if
113      * all cohorts returned true.
114      *
115      * <p>
116      * Valid state transition is from SUBMITTED to CAN_COMMIT,
117      * if currentPhase is not SUBMITTED throws IllegalStateException.
118      *
119      * @return List of all cohorts futures from can commit phase.
120      *
121      */
122     private ListenableFuture<?>[] canCommitAll() {
123         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
124         int index = 0;
125         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
126             ops[index++] = cohort.canCommit();
127         }
128         return ops;
129     }
130
131     /**
132      * Invokes preCommit on underlying cohorts and blocks till
133      * all results are returned.
134      *
135      * <p>
136      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
137      * state is not CAN_COMMIT
138      * throws IllegalStateException.
139      *
140      * @throws TransactionCommitFailedException
141      *             If one of cohorts failed preCommit
142      *
143      */
144     private void preCommitBlocking() throws TransactionCommitFailedException {
145         final ListenableFuture<?>[] preCommitFutures = preCommitAll();
146         try {
147             for (final ListenableFuture<?> future : preCommitFutures) {
148                 future.get();
149             }
150         } catch (InterruptedException | ExecutionException e) {
151             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
152         }
153     }
154
155     /**
156      * Invokes preCommit on underlying cohorts and returns future
157      * which will complete once all preCommit on cohorts completed or
158      * failed.
159      *
160      * <p>
161      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
162      * state is not CAN_COMMIT
163      * throws IllegalStateException.
164      *
165      * @return List of all cohorts futures from can commit phase.
166      *
167      */
168     private ListenableFuture<?>[] preCommitAll() {
169         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
170         int index = 0;
171         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
172             ops[index++] = cohort.preCommit();
173         }
174         return ops;
175     }
176
177     /**
178      * Invokes commit on underlying cohorts and blocks till
179      * all results are returned.
180      *
181      * <p>
182      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
183      * IllegalStateException.
184      *
185      * @throws TransactionCommitFailedException
186      *             If one of cohorts failed preCommit
187      *
188      */
189     private void commitBlocking() throws TransactionCommitFailedException {
190         final ListenableFuture<?>[] commitFutures = commitAll();
191         try {
192             for (final ListenableFuture<?> future : commitFutures) {
193                 future.get();
194             }
195         } catch (InterruptedException | ExecutionException e) {
196             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
197         }
198     }
199
200     /**
201      * Invokes commit on underlying cohorts and returns future which
202      * completes
203      * once all commits on cohorts are completed.
204      *
205      * <p>
206      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
207      * IllegalStateException
208      *
209      * @return List of all cohorts futures from can commit phase.
210      */
211     private ListenableFuture<?>[] commitAll() {
212         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
213         int index = 0;
214         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
215             ops[index++] = cohort.commit();
216         }
217         return ops;
218     }
219
220     /**
221      * Aborts transaction.
222      *
223      * <p>
224      * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
225      * cohorts, blocks
226      * for all results. If any of the abort failed throws
227      * IllegalStateException,
228      * which will contains originalCause as suppressed Exception.
229      *
230      * <p>
231      * If aborts we're successful throws supplied exception
232      *
233      * @param originalCause
234      *            Exception which should be used to fail transaction for
235      *            consumers of transaction
236      *            future and listeners of transaction failure.
237      * @param phase phase in which the problem ensued
238      * @throws TransactionCommitFailedException
239      *             on invocation of this method.
240      *             originalCa
241      * @throws IllegalStateException
242      *             if abort failed.
243      */
244     private void abortBlocking(
245             final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
246         Exception cause = originalCause;
247         try {
248             abortAsyncAll().get();
249         } catch (InterruptedException | ExecutionException e) {
250             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
251             cause = new IllegalStateException("Abort failed.", e);
252             cause.addSuppressed(e);
253         }
254         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
255     }
256
257     /**
258      * Invokes abort on underlying cohorts and returns future which
259      * completes once all abort on cohorts are completed.
260      *
261      * @return Future which will complete once all cohorts completed
262      *         abort.
263      */
264     @SuppressWarnings({"unchecked", "rawtypes"})
265     private ListenableFuture<Void> abortAsyncAll() {
266
267         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
268         int index = 0;
269         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
270             ops[index++] = cohort.abort();
271         }
272
273         /*
274          * We are returning all futures as list, not only succeeded ones in
275          * order to fail composite future if any of them failed.
276          * See Futures.allAsList for this description.
277          */
278         return (ListenableFuture) Futures.allAsList(ops);
279     }
280 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.