Switch to Objects.requireNonNull
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / CommitCoordinationTask.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Throwables;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Collection;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import org.opendaylight.mdsal.common.api.CommitInfo;
20 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
22 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
23 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * Implementation of blocking three-phase commit-coordination tasks without support of cancellation.
29  */
30 final class CommitCoordinationTask implements Callable<CommitInfo> {
31     private enum Phase {
32         CAN_COMMIT,
33         PRE_COMMIT,
34         DO_COMMIT
35     }
36
37     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
38     private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
39     private final DurationStatisticsTracker commitStatTracker;
40     private final DOMDataTreeWriteTransaction tx;
41
42     CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction,
43             final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
44             final DurationStatisticsTracker commitStatTracker) {
45         this.tx = requireNonNull(transaction, "transaction must not be null");
46         this.cohorts = requireNonNull(cohorts, "cohorts must not be null");
47         this.commitStatTracker = commitStatTracker;
48     }
49
50     @Override
51     public CommitInfo call() throws TransactionCommitFailedException {
52         final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
53
54         Phase phase = Phase.CAN_COMMIT;
55
56         try {
57             LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
58             canCommitBlocking();
59
60             phase = Phase.PRE_COMMIT;
61             LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
62             preCommitBlocking();
63
64             phase = Phase.DO_COMMIT;
65             LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
66             commitBlocking();
67
68             LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
69             return CommitInfo.empty();
70         } catch (final TransactionCommitFailedException e) {
71             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
72             abortBlocking(e);
73             throw e;
74         } finally {
75             if (commitStatTracker != null) {
76                 commitStatTracker.addDuration(System.nanoTime() - startTime);
77             }
78         }
79     }
80
81     /**
82      * Invokes canCommit on underlying cohorts and blocks till all results are returned.
83      *
84      * <p>
85      * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
86      * IllegalStateException.
87      *
88      * @throws TransactionCommitFailedException If one of cohorts failed can Commit
89      */
90     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
91     private void canCommitBlocking() throws TransactionCommitFailedException {
92         for (final ListenableFuture<?> canCommit : canCommitAll()) {
93             try {
94                 final Boolean result = (Boolean)canCommit.get();
95                 if (result == null || !result) {
96                     throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
97                 }
98             } catch (InterruptedException | ExecutionException e) {
99                 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
100             }
101         }
102     }
103
104     /**
105      * Invokes canCommit on underlying cohorts and returns composite future which will contain {@link Boolean#TRUE} only
106      * and only if all cohorts returned true.
107      *
108      * <p>
109      * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
110      * IllegalStateException.
111      *
112      * @return List of all cohorts futures from can commit phase.
113      */
114     private ListenableFuture<?>[] canCommitAll() {
115         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
116         int index = 0;
117         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
118             ops[index++] = cohort.canCommit();
119         }
120         return ops;
121     }
122
123     /**
124      * Invokes preCommit on underlying cohorts and blocks until all results are returned.
125      *
126      * <p>
127      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
128      * IllegalStateException.
129      *
130      * @throws TransactionCommitFailedException If one of cohorts failed preCommit
131      */
132     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
133     private void preCommitBlocking() throws TransactionCommitFailedException {
134         final ListenableFuture<?>[] preCommitFutures = preCommitAll();
135         try {
136             for (final ListenableFuture<?> future : preCommitFutures) {
137                 future.get();
138             }
139         } catch (InterruptedException | ExecutionException e) {
140             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
141         }
142     }
143
144     /**
145      * Invokes preCommit on underlying cohorts and returns future which will complete once all preCommit on cohorts
146      * completed or failed.
147      *
148      * <p>
149      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
150      * IllegalStateException.
151      *
152      * @return List of all cohorts futures from can commit phase.
153      */
154     private ListenableFuture<?>[] preCommitAll() {
155         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
156         int index = 0;
157         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
158             ops[index++] = cohort.preCommit();
159         }
160         return ops;
161     }
162
163     /**
164      * Invokes commit on underlying cohorts and blocks until all results are returned.
165      *
166      * <p>
167      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws IllegalStateException.
168      *
169      * @throws TransactionCommitFailedException If one of cohorts failed preCommit
170      */
171     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
172     private void commitBlocking() throws TransactionCommitFailedException {
173         final ListenableFuture<?>[] commitFutures = commitAll();
174         try {
175             for (final ListenableFuture<?> future : commitFutures) {
176                 future.get();
177             }
178         } catch (InterruptedException | ExecutionException e) {
179             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
180         }
181     }
182
183     /**
184      * Invokes commit on underlying cohorts and returns future which
185      * completes
186      * once all commits on cohorts are completed.
187      *
188      *<p>
189      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
190      * IllegalStateException
191      *
192      * @return List of all cohorts futures from can commit phase.
193      */
194     private ListenableFuture<?>[] commitAll() {
195         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
196         int index = 0;
197         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
198             ops[index++] = cohort.commit();
199         }
200         return ops;
201     }
202
203     /**
204      * Aborts transaction.
205      *
206      * <p>
207      * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all cohorts, blocks for all results. If any
208      * of the abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
209      *
210      * <p>
211      * If aborts we're successful throws supplied exception
212      *
213      * @param originalCause Exception which should be used to fail transaction for consumers of transaction future
214      *                      and listeners of transaction failure.
215      * @param phase phase in which the problem ensued
216      * @throws TransactionCommitFailedException on invocation of this method.
217      * @throws IllegalStateException if abort failed.
218      */
219     private void abortBlocking(final TransactionCommitFailedException originalCause)
220             throws TransactionCommitFailedException {
221         Exception cause = originalCause;
222         try {
223             abortAsyncAll().get();
224         } catch (InterruptedException | ExecutionException e) {
225             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
226             cause = new IllegalStateException("Abort failed.", e);
227             cause.addSuppressed(e);
228         }
229         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
230     }
231
232     /**
233      * Invokes abort on underlying cohorts and returns future which completes once all abort on cohorts are completed.
234      *
235      * @return Future which will complete once all cohorts completed abort.
236      */
237     @SuppressWarnings({"unchecked", "rawtypes"})
238     private ListenableFuture<Void> abortAsyncAll() {
239
240         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
241         int index = 0;
242         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
243             ops[index++] = cohort.abort();
244         }
245
246         /*
247          * We are returning all futures as list, not only succeeded ones in
248          * order to fail composite future if any of them failed.
249          * See Futures.allAsList for this description.
250          */
251         return (ListenableFuture) Futures.allAsList(ops);
252     }
253 }