efe4c19c5910a75c67f48661a131cbfb8722c59c
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / CommitCoordinationTask.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Throwables;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.Collection;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
19 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Implementation of blocking three-phase commit-coordination tasks without
27  * support of cancellation.
28  */
29 final class CommitCoordinationTask implements Callable<Void> {
30     private enum Phase {
31         canCommit,
32         preCommit,
33         doCommit,
34     }
35
36     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
37     private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
38     private final DurationStatisticsTracker commitStatTracker;
39     private final DOMDataWriteTransaction tx;
40
41     public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
42             final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
43             final DurationStatisticsTracker commitStatTracker) {
44         this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
45         this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
46         this.commitStatTracker = commitStatTracker;
47     }
48
49     @Override
50     public Void call() throws TransactionCommitFailedException {
51         final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
52
53         Phase phase = Phase.canCommit;
54
55         try {
56             LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
57             canCommitBlocking();
58
59             phase = Phase.preCommit;
60             LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
61             preCommitBlocking();
62
63             phase = Phase.doCommit;
64             LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
65             commitBlocking();
66
67             LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
68             return null;
69         } catch (final TransactionCommitFailedException e) {
70             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
71             abortBlocking(e);
72             throw e;
73         } finally {
74             if (commitStatTracker != null) {
75                 commitStatTracker.addDuration(System.nanoTime() - startTime);
76             }
77         }
78     }
79
80     /**
81      *
82      * Invokes canCommit on underlying cohorts and blocks till
83      * all results are returned.
84      *
85      * Valid state transition is from SUBMITTED to CAN_COMMIT,
86      * if currentPhase is not SUBMITTED throws IllegalStateException.
87      *
88      * @throws TransactionCommitFailedException
89      *             If one of cohorts failed can Commit
90      *
91      */
92     private void canCommitBlocking() throws TransactionCommitFailedException {
93         for (final ListenableFuture<?> canCommit : canCommitAll()) {
94             try {
95                 final Boolean result = (Boolean)canCommit.get();
96                 if (result == null || !result) {
97                     throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
98                 }
99             } catch (InterruptedException | ExecutionException e) {
100                 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
101             }
102         }
103     }
104
105     /**
106      *
107      * Invokes canCommit on underlying cohorts and returns composite future
108      * which will contains {@link Boolean#TRUE} only and only if
109      * all cohorts returned true.
110      *
111      * Valid state transition is from SUBMITTED to CAN_COMMIT,
112      * if currentPhase is not SUBMITTED throws IllegalStateException.
113      *
114      * @return List of all cohorts futures from can commit phase.
115      *
116      */
117     private ListenableFuture<?>[] canCommitAll() {
118         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
119         int i = 0;
120         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
121             ops[i++] = cohort.canCommit();
122         }
123         return ops;
124     }
125
126     /**
127      *
128      * Invokes preCommit on underlying cohorts and blocks till
129      * all results are returned.
130      *
131      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
132      * state is not CAN_COMMIT
133      * throws IllegalStateException.
134      *
135      * @throws TransactionCommitFailedException
136      *             If one of cohorts failed preCommit
137      *
138      */
139     private void preCommitBlocking() throws TransactionCommitFailedException {
140         final ListenableFuture<?>[] preCommitFutures = preCommitAll();
141         try {
142             for(final ListenableFuture<?> future : preCommitFutures) {
143                 future.get();
144             }
145         } catch (InterruptedException | ExecutionException e) {
146             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
147         }
148     }
149
150     /**
151      *
152      * Invokes preCommit on underlying cohorts and returns future
153      * which will complete once all preCommit on cohorts completed or
154      * failed.
155      *
156      *
157      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
158      * state is not CAN_COMMIT
159      * throws IllegalStateException.
160      *
161      * @return List of all cohorts futures from can commit phase.
162      *
163      */
164     private ListenableFuture<?>[] preCommitAll() {
165         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
166         int i = 0;
167         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
168             ops[i++] = cohort.preCommit();
169         }
170         return ops;
171     }
172
173     /**
174      *
175      * Invokes commit on underlying cohorts and blocks till
176      * all results are returned.
177      *
178      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
179      * IllegalStateException.
180      *
181      * @throws TransactionCommitFailedException
182      *             If one of cohorts failed preCommit
183      *
184      */
185     private void commitBlocking() throws TransactionCommitFailedException {
186         final ListenableFuture<?>[] commitFutures = commitAll();
187         try {
188             for(final ListenableFuture<?> future : commitFutures) {
189                 future.get();
190             }
191         } catch (InterruptedException | ExecutionException e) {
192             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
193         }
194     }
195
196     /**
197      *
198      * Invokes commit on underlying cohorts and returns future which
199      * completes
200      * once all commits on cohorts are completed.
201      *
202      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
203      * IllegalStateException
204      *
205      * @return List of all cohorts futures from can commit phase.
206      */
207     private ListenableFuture<?>[] commitAll() {
208         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
209         int i = 0;
210         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
211             ops[i++] = cohort.commit();
212         }
213         return ops;
214     }
215
216     /**
217      * Aborts transaction.
218      *
219      * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
220      * cohorts, blocks
221      * for all results. If any of the abort failed throws
222      * IllegalStateException,
223      * which will contains originalCause as suppressed Exception.
224      *
225      * If aborts we're successful throws supplied exception
226      *
227      * @param originalCause
228      *            Exception which should be used to fail transaction for
229      *            consumers of transaction
230      *            future and listeners of transaction failure.
231      * @param phase phase in which the problem ensued
232      * @throws TransactionCommitFailedException
233      *             on invocation of this method.
234      *             originalCa
235      * @throws IllegalStateException
236      *             if abort failed.
237      */
238     private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
239         Exception cause = originalCause;
240         try {
241             abortAsyncAll().get();
242         } catch (InterruptedException | ExecutionException e) {
243             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
244             cause = new IllegalStateException("Abort failed.", e);
245             cause.addSuppressed(e);
246         }
247         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
248     }
249
250     /**
251      * Invokes abort on underlying cohorts and returns future which
252      * completes once all abort on cohorts are completed.
253      *
254      * @return Future which will complete once all cohorts completed
255      *         abort.
256      */
257     @SuppressWarnings({"unchecked", "rawtypes"})
258     private ListenableFuture<Void> abortAsyncAll() {
259
260         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
261         int i = 0;
262         for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
263             ops[i++] = cohort.abort();
264         }
265
266         /*
267          * We are returning all futures as list, not only succeeded ones in
268          * order to fail composite future if any of them failed.
269          * See Futures.allAsList for this description.
270          */
271         return (ListenableFuture) Futures.allAsList(ops);
272     }
273 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.