Merge "Removing { } from NormalizedNodeJsonBodyWriter"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / CommitCoordinationTask.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import com.google.common.base.Preconditions;
10 import com.google.common.base.Throwables;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.Collection;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ExecutionException;
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
19 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * Implementation of blocking three-phase commit-coordination tasks without
25  * support of cancellation.
26  */
27 final class CommitCoordinationTask implements Callable<Void> {
28     private static enum Phase {
29         canCommit,
30         preCommit,
31         doCommit,
32     };
33
34     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
35     private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
36     private final DurationStatisticsTracker commitStatTracker;
37     private final DOMDataWriteTransaction tx;
38
39     public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
40             final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
41             final DurationStatisticsTracker commitStatTracker) {
42         this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
43         this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
44         this.commitStatTracker = commitStatTracker;
45     }
46
47     @Override
48     public Void call() throws TransactionCommitFailedException {
49         final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
50
51         Phase phase = Phase.canCommit;
52
53         try {
54             LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
55             canCommitBlocking();
56
57             phase = Phase.preCommit;
58             LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
59             preCommitBlocking();
60
61             phase = Phase.doCommit;
62             LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
63             commitBlocking();
64
65             LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
66             return null;
67         } catch (TransactionCommitFailedException e) {
68             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
69             abortBlocking(e);
70             throw e;
71         } finally {
72             if (commitStatTracker != null) {
73                 commitStatTracker.addDuration(System.nanoTime() - startTime);
74             }
75         }
76     }
77
78     /**
79      *
80      * Invokes canCommit on underlying cohorts and blocks till
81      * all results are returned.
82      *
83      * Valid state transition is from SUBMITTED to CAN_COMMIT,
84      * if currentPhase is not SUBMITTED throws IllegalStateException.
85      *
86      * @throws TransactionCommitFailedException
87      *             If one of cohorts failed can Commit
88      *
89      */
90     private void canCommitBlocking() throws TransactionCommitFailedException {
91         for (ListenableFuture<?> canCommit : canCommitAll()) {
92             try {
93                 final Boolean result = (Boolean)canCommit.get();
94                 if (result == null || !result) {
95                     throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
96                 }
97             } catch (InterruptedException | ExecutionException e) {
98                 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
99             }
100         }
101     }
102
103     /**
104      *
105      * Invokes canCommit on underlying cohorts and returns composite future
106      * which will contains {@link Boolean#TRUE} only and only if
107      * all cohorts returned true.
108      *
109      * Valid state transition is from SUBMITTED to CAN_COMMIT,
110      * if currentPhase is not SUBMITTED throws IllegalStateException.
111      *
112      * @return List of all cohorts futures from can commit phase.
113      *
114      */
115     private ListenableFuture<?>[] canCommitAll() {
116         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
117         int i = 0;
118         for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
119             ops[i++] = cohort.canCommit();
120         }
121         return ops;
122     }
123
124     /**
125      *
126      * Invokes preCommit on underlying cohorts and blocks till
127      * all results are returned.
128      *
129      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
130      * state is not CAN_COMMIT
131      * throws IllegalStateException.
132      *
133      * @throws TransactionCommitFailedException
134      *             If one of cohorts failed preCommit
135      *
136      */
137     private void preCommitBlocking() throws TransactionCommitFailedException {
138         final ListenableFuture<?>[] preCommitFutures = preCommitAll();
139         try {
140             for(ListenableFuture<?> future : preCommitFutures) {
141                 future.get();
142             }
143         } catch (InterruptedException | ExecutionException e) {
144             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
145         }
146     }
147
148     /**
149      *
150      * Invokes preCommit on underlying cohorts and returns future
151      * which will complete once all preCommit on cohorts completed or
152      * failed.
153      *
154      *
155      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
156      * state is not CAN_COMMIT
157      * throws IllegalStateException.
158      *
159      * @return List of all cohorts futures from can commit phase.
160      *
161      */
162     private ListenableFuture<?>[] preCommitAll() {
163         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
164         int i = 0;
165         for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
166             ops[i++] = cohort.preCommit();
167         }
168         return ops;
169     }
170
171     /**
172      *
173      * Invokes commit on underlying cohorts and blocks till
174      * all results are returned.
175      *
176      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
177      * IllegalStateException.
178      *
179      * @throws TransactionCommitFailedException
180      *             If one of cohorts failed preCommit
181      *
182      */
183     private void commitBlocking() throws TransactionCommitFailedException {
184         final ListenableFuture<?>[] commitFutures = commitAll();
185         try {
186             for(ListenableFuture<?> future : commitFutures) {
187                 future.get();
188             }
189         } catch (InterruptedException | ExecutionException e) {
190             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
191         }
192     }
193
194     /**
195      *
196      * Invokes commit on underlying cohorts and returns future which
197      * completes
198      * once all commits on cohorts are completed.
199      *
200      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
201      * IllegalStateException
202      *
203      * @return List of all cohorts futures from can commit phase.
204      */
205     private ListenableFuture<?>[] commitAll() {
206         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
207         int i = 0;
208         for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
209             ops[i++] = cohort.commit();
210         }
211         return ops;
212     }
213
214     /**
215      * Aborts transaction.
216      *
217      * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
218      * cohorts, blocks
219      * for all results. If any of the abort failed throws
220      * IllegalStateException,
221      * which will contains originalCause as suppressed Exception.
222      *
223      * If aborts we're successful throws supplied exception
224      *
225      * @param originalCause
226      *            Exception which should be used to fail transaction for
227      *            consumers of transaction
228      *            future and listeners of transaction failure.
229      * @param phase phase in which the problem ensued
230      * @throws TransactionCommitFailedException
231      *             on invocation of this method.
232      *             originalCa
233      * @throws IllegalStateException
234      *             if abort failed.
235      */
236     private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
237         Exception cause = originalCause;
238         try {
239             abortAsyncAll().get();
240         } catch (InterruptedException | ExecutionException e) {
241             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
242             cause = new IllegalStateException("Abort failed.", e);
243             cause.addSuppressed(e);
244         }
245         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
246     }
247
248     /**
249      * Invokes abort on underlying cohorts and returns future which
250      * completes once all abort on cohorts are completed.
251      *
252      * @return Future which will complete once all cohorts completed
253      *         abort.
254      */
255     private ListenableFuture<Void> abortAsyncAll() {
256
257         final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
258         int i = 0;
259         for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
260             ops[i++] = cohort.abort();
261         }
262
263         /*
264          * We are returning all futures as list, not only succeeded ones in
265          * order to fail composite future if any of them failed.
266          * See Futures.allAsList for this description.
267          */
268         @SuppressWarnings({ "unchecked", "rawtypes" })
269         ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
270         return compositeResult;
271     }
272 }