API Usability: Introduced type capture for Transaction Factory
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import java.util.Collections;
10 import java.util.List;
11 import java.util.concurrent.Callable;
12 import java.util.concurrent.ExecutionException;
13
14 import javax.annotation.concurrent.GuardedBy;
15
16 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
19 import org.opendaylight.controller.sal.common.util.Rpcs;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.yangtools.yang.common.RpcError;
22 import org.opendaylight.yangtools.yang.common.RpcResult;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import com.google.common.base.Function;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Throwables;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableList.Builder;
32 import com.google.common.util.concurrent.CheckedFuture;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.ListeningExecutorService;
36
37 /**
38  *
39  * Implementation of blocking three phase commit coordinator, which which
40  * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
41  *
42  * This implementation does not support cancelation of commit,
43  *
44  * In order to advance to next phase of three phase commit all subtasks of
45  * previous step must be finish.
46  *
47  * This executor does not have an upper bound on subtask timeout.
48  *
49  *
50  */
51 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
52
53     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
54
55     /**
56      * Runs AND binary operation between all booleans in supplied iteration of booleans.
57      *
58      * This method will stop evaluating iterables if first found is false.
59      */
60     private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
61
62         @Override
63         public Boolean apply(final Iterable<Boolean> input) {
64             for(boolean value : input) {
65                if(!value) {
66                    return Boolean.FALSE;
67                }
68             }
69             return Boolean.TRUE;
70         }
71     };
72
73     private final ListeningExecutorService executor;
74
75     /**
76      *
77      * Construct DOMDataCommitCoordinator which uses supplied executor to
78      * process commit coordinations.
79      *
80      * @param executor
81      */
82     public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
83         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
84     }
85
86     @Override
87     public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
88             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
89         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
90         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
91         Preconditions.checkArgument(listener != null, "Listener must not be null");
92         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
93         ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
94                 transaction, cohorts, listener));
95         if (listener.isPresent()) {
96             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
97         }
98         return commitFuture;
99     }
100
101     /**
102      *
103      * Phase of 3PC commit
104      *
105      * Represents phase of 3PC Commit
106      *
107      *
108      */
109     private static enum CommitPhase {
110         /**
111          *
112          * Commit Coordination Task is submitted for executing
113          *
114          */
115         SUBMITTED,
116         /**
117          * Commit Coordination Task is in can commit phase of 3PC
118          *
119          */
120         CAN_COMMIT,
121         /**
122          * Commit Coordination Task is in pre-commit phase of 3PC
123          *
124          */
125         PRE_COMMIT,
126         /**
127          * Commit Coordination Task is in commit phase of 3PC
128          *
129          */
130         COMMIT,
131         /**
132          * Commit Coordination Task is in abort phase of 3PC
133          *
134          */
135         ABORT
136     }
137
138     /**
139      *
140      * Implementation of blocking three-phase commit-coordination tasks without
141      * support of cancelation.
142      *
143      */
144     private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
145
146         private final DOMDataWriteTransaction tx;
147         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
148
149         @GuardedBy("this")
150         private CommitPhase currentPhase;
151
152         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
153                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
154                 final Optional<DOMDataCommitErrorListener> listener) {
155             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
156             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
157             this.currentPhase = CommitPhase.SUBMITTED;
158         }
159
160         @Override
161         public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
162
163             try {
164                 canCommitBlocking();
165                 preCommitBlocking();
166                 return commitBlocking();
167             } catch (TransactionCommitFailedException e) {
168                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
169                 abortBlocking(e);
170                 throw e;
171             }
172         }
173
174         /**
175          *
176          * Invokes canCommit on underlying cohorts and blocks till
177          * all results are returned.
178          *
179          * Valid state transition is from SUBMITTED to CAN_COMMIT,
180          * if currentPhase is not SUBMITTED throws IllegalStateException.
181          *
182          * @throws TransactionCommitFailedException
183          *             If one of cohorts failed can Commit
184          *
185          */
186         private void canCommitBlocking() throws TransactionCommitFailedException {
187             final Boolean canCommitResult = canCommitAll().checkedGet();
188             if (!canCommitResult) {
189                 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
190             }
191         }
192
193         /**
194          *
195          * Invokes preCommit on underlying cohorts and blocks till
196          * all results are returned.
197          *
198          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
199          * state is not CAN_COMMIT
200          * throws IllegalStateException.
201          *
202          * @throws TransactionCommitFailedException
203          *             If one of cohorts failed preCommit
204          *
205          */
206         private void preCommitBlocking() throws TransactionCommitFailedException {
207             preCommitAll().checkedGet();
208         }
209
210         /**
211          *
212          * Invokes commit on underlying cohorts and blocks till
213          * all results are returned.
214          *
215          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
216          * IllegalStateException.
217          *
218          * @throws TransactionCommitFailedException
219          *             If one of cohorts failed preCommit
220          *
221          */
222         private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
223             commitAll().checkedGet();
224             return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.<RpcError> emptySet());
225         }
226
227         /**
228          * Aborts transaction.
229          *
230          * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
231          * cohorts, blocks
232          * for all results. If any of the abort failed throws
233          * IllegalStateException,
234          * which will contains originalCause as suppressed Exception.
235          *
236          * If aborts we're successful throws supplied exception
237          *
238          * @param originalCause
239          *            Exception which should be used to fail transaction for
240          *            consumers of transaction
241          *            future and listeners of transaction failure.
242          * @throws TransactionCommitFailedException
243          *             on invocation of this method.
244          *             originalCa
245          * @throws IllegalStateException
246          *             if abort failed.
247          */
248         private void abortBlocking(final TransactionCommitFailedException originalCause)
249                 throws TransactionCommitFailedException {
250             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
251             Exception cause = originalCause;
252             try {
253                 abortAsyncAll().get();
254             } catch (InterruptedException | ExecutionException e) {
255                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
256                 cause = new IllegalStateException("Abort failed.", e);
257                 cause.addSuppressed(e);
258             }
259             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
260         }
261
262         /**
263          *
264          * Invokes preCommit on underlying cohorts and returns future
265          * which will complete once all preCommit on cohorts completed or
266          * failed.
267          *
268          *
269          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
270          * state is not CAN_COMMIT
271          * throws IllegalStateException.
272          *
273          * @return Future which will complete once all cohorts completed
274          *         preCommit.
275          *         Future throws TransactionCommitFailedException
276          *         If any of cohorts failed preCommit
277          *
278          */
279         private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
280             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
281             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
282             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
283                 ops.add(cohort.preCommit());
284             }
285             /*
286              * We are returing all futures as list, not only succeeded ones in
287              * order to fail composite future if any of them failed.
288              * See Futures.allAsList for this description.
289              */
290             @SuppressWarnings({ "unchecked", "rawtypes" })
291             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
292             return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
293         }
294
295         /**
296          *
297          * Invokes commit on underlying cohorts and returns future which
298          * completes
299          * once all commits on cohorts are completed.
300          *
301          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
302          * IllegalStateException
303          *
304          * @return Future which will complete once all cohorts completed
305          *         commit.
306          *         Future throws TransactionCommitFailedException
307          *         If any of cohorts failed preCommit
308          *
309          */
310         private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
311             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
312             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
313             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
314                 ops.add(cohort.commit());
315             }
316             /*
317              * We are returing all futures as list, not only succeeded ones in
318              * order to fail composite future if any of them failed.
319              * See Futures.allAsList for this description.
320              */
321             @SuppressWarnings({ "unchecked", "rawtypes" })
322             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
323             return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
324         }
325
326         /**
327          *
328          * Invokes canCommit on underlying cohorts and returns composite future
329          * which will contains {@link Boolean#TRUE} only and only if
330          * all cohorts returned true.
331          *
332          * Valid state transition is from SUBMITTED to CAN_COMMIT,
333          * if currentPhase is not SUBMITTED throws IllegalStateException.
334          *
335          * @return Future which will complete once all cohorts completed
336          *         preCommit.
337          *         Future throws TransactionCommitFailedException
338          *         If any of cohorts failed preCommit
339          *
340          */
341         private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
342             changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
343             Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
344             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
345                 canCommitOperations.add(cohort.canCommit());
346             }
347             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
348             ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
349             return Futures
350                     .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
351
352         }
353
354         /**
355          *
356          * Invokes abort on underlying cohorts and returns future which
357          * completes
358          * once all abort on cohorts are completed.
359          *
360          * @return Future which will complete once all cohorts completed
361          *         abort.
362          *
363          */
364         private ListenableFuture<Void> abortAsyncAll() {
365             changeStateFrom(currentPhase, CommitPhase.ABORT);
366             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
367             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
368                 ops.add(cohort.abort());
369             }
370             /*
371              * We are returing all futures as list, not only succeeded ones in
372              * order to fail composite future if any of them failed.
373              * See Futures.allAsList for this description.
374              */
375             @SuppressWarnings({ "unchecked", "rawtypes" })
376             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
377             return compositeResult;
378         }
379
380         /**
381          * Change phase / state of transaction from expected value to new value
382          *
383          * This method checks state and updates state to new state of
384          * of this task if current state equals expected state.
385          * If expected state and current state are different raises
386          * IllegalStateException
387          * which means there is probably bug in implementation of commit
388          * coordination.
389          *
390          * If transition is successful, it logs transition on DEBUG level.
391          *
392          * @param currentExpected
393          *            Required phase for change of state
394          * @param newState
395          *            New Phase which will be entered by transaction.
396          * @throws IllegalStateException
397          *             If currentState of task does not match expected state
398          */
399         private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
400             Preconditions.checkState(currentPhase.equals(currentExpected),
401                     "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
402                     currentPhase, newState);
403             LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
404             currentPhase = newState;
405         };
406
407     }
408
409 }