Merge "Bug 1239 - Clean up and refactor netconf-ssh client"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import java.util.List;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutionException;
12
13 import javax.annotation.concurrent.GuardedBy;
14
15 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
19 import org.opendaylight.yangtools.yang.common.RpcResult;
20 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Throwables;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableList.Builder;
30 import com.google.common.util.concurrent.CheckedFuture;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.ListeningExecutorService;
34
35 /**
36  *
37  * Implementation of blocking three phase commit coordinator, which which
38  * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
39  *
40  * This implementation does not support cancelation of commit,
41  *
42  * In order to advance to next phase of three phase commit all subtasks of
43  * previous step must be finish.
44  *
45  * This executor does not have an upper bound on subtask timeout.
46  *
47  *
48  */
49 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
50
51     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
52
53     /**
54      * Runs AND binary operation between all booleans in supplied iteration of booleans.
55      *
56      * This method will stop evaluating iterables if first found is false.
57      */
58     private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
59
60         @Override
61         public Boolean apply(final Iterable<Boolean> input) {
62             for(boolean value : input) {
63                if(!value) {
64                    return Boolean.FALSE;
65                }
66             }
67             return Boolean.TRUE;
68         }
69     };
70
71     private final ListeningExecutorService executor;
72
73     /**
74      *
75      * Construct DOMDataCommitCoordinator which uses supplied executor to
76      * process commit coordinations.
77      *
78      * @param executor
79      */
80     public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
81         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
82     }
83
84     @Override
85     public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
86             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
87         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
88         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
89         Preconditions.checkArgument(listener != null, "Listener must not be null");
90         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
91         ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
92                 transaction, cohorts, listener));
93         if (listener.isPresent()) {
94             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
95         }
96         return commitFuture;
97     }
98
99     /**
100      *
101      * Phase of 3PC commit
102      *
103      * Represents phase of 3PC Commit
104      *
105      *
106      */
107     private static enum CommitPhase {
108         /**
109          *
110          * Commit Coordination Task is submitted for executing
111          *
112          */
113         SUBMITTED,
114         /**
115          * Commit Coordination Task is in can commit phase of 3PC
116          *
117          */
118         CAN_COMMIT,
119         /**
120          * Commit Coordination Task is in pre-commit phase of 3PC
121          *
122          */
123         PRE_COMMIT,
124         /**
125          * Commit Coordination Task is in commit phase of 3PC
126          *
127          */
128         COMMIT,
129         /**
130          * Commit Coordination Task is in abort phase of 3PC
131          *
132          */
133         ABORT
134     }
135
136     /**
137      *
138      * Implementation of blocking three-phase commit-coordination tasks without
139      * support of cancelation.
140      *
141      */
142     private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
143
144         private final DOMDataWriteTransaction tx;
145         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
146
147         @GuardedBy("this")
148         private CommitPhase currentPhase;
149
150         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
151                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
152                 final Optional<DOMDataCommitErrorListener> listener) {
153             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
154             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
155             this.currentPhase = CommitPhase.SUBMITTED;
156         }
157
158         @Override
159         public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
160
161             try {
162                 canCommitBlocking();
163                 preCommitBlocking();
164                 return commitBlocking();
165             } catch (TransactionCommitFailedException e) {
166                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
167                 abortBlocking(e);
168                 throw e;
169             }
170         }
171
172         /**
173          *
174          * Invokes canCommit on underlying cohorts and blocks till
175          * all results are returned.
176          *
177          * Valid state transition is from SUBMITTED to CAN_COMMIT,
178          * if currentPhase is not SUBMITTED throws IllegalStateException.
179          *
180          * @throws TransactionCommitFailedException
181          *             If one of cohorts failed can Commit
182          *
183          */
184         private void canCommitBlocking() throws TransactionCommitFailedException {
185             final Boolean canCommitResult = canCommitAll().checkedGet();
186             if (!canCommitResult) {
187                 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
188             }
189         }
190
191         /**
192          *
193          * Invokes preCommit on underlying cohorts and blocks till
194          * all results are returned.
195          *
196          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
197          * state is not CAN_COMMIT
198          * throws IllegalStateException.
199          *
200          * @throws TransactionCommitFailedException
201          *             If one of cohorts failed preCommit
202          *
203          */
204         private void preCommitBlocking() throws TransactionCommitFailedException {
205             preCommitAll().checkedGet();
206         }
207
208         /**
209          *
210          * Invokes commit on underlying cohorts and blocks till
211          * all results are returned.
212          *
213          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
214          * IllegalStateException.
215          *
216          * @throws TransactionCommitFailedException
217          *             If one of cohorts failed preCommit
218          *
219          */
220         private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
221             commitAll().checkedGet();
222             return RpcResultBuilder.<TransactionStatus>success(TransactionStatus.COMMITED).build();
223         }
224
225         /**
226          * Aborts transaction.
227          *
228          * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
229          * cohorts, blocks
230          * for all results. If any of the abort failed throws
231          * IllegalStateException,
232          * which will contains originalCause as suppressed Exception.
233          *
234          * If aborts we're successful throws supplied exception
235          *
236          * @param originalCause
237          *            Exception which should be used to fail transaction for
238          *            consumers of transaction
239          *            future and listeners of transaction failure.
240          * @throws TransactionCommitFailedException
241          *             on invocation of this method.
242          *             originalCa
243          * @throws IllegalStateException
244          *             if abort failed.
245          */
246         private void abortBlocking(final TransactionCommitFailedException originalCause)
247                 throws TransactionCommitFailedException {
248             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
249             Exception cause = originalCause;
250             try {
251                 abortAsyncAll().get();
252             } catch (InterruptedException | ExecutionException e) {
253                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
254                 cause = new IllegalStateException("Abort failed.", e);
255                 cause.addSuppressed(e);
256             }
257             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
258         }
259
260         /**
261          *
262          * Invokes preCommit on underlying cohorts and returns future
263          * which will complete once all preCommit on cohorts completed or
264          * failed.
265          *
266          *
267          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
268          * state is not CAN_COMMIT
269          * throws IllegalStateException.
270          *
271          * @return Future which will complete once all cohorts completed
272          *         preCommit.
273          *         Future throws TransactionCommitFailedException
274          *         If any of cohorts failed preCommit
275          *
276          */
277         private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
278             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
279             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
280             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
281                 ops.add(cohort.preCommit());
282             }
283             /*
284              * We are returing all futures as list, not only succeeded ones in
285              * order to fail composite future if any of them failed.
286              * See Futures.allAsList for this description.
287              */
288             @SuppressWarnings({ "unchecked", "rawtypes" })
289             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
290             return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
291         }
292
293         /**
294          *
295          * Invokes commit on underlying cohorts and returns future which
296          * completes
297          * once all commits on cohorts are completed.
298          *
299          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
300          * IllegalStateException
301          *
302          * @return Future which will complete once all cohorts completed
303          *         commit.
304          *         Future throws TransactionCommitFailedException
305          *         If any of cohorts failed preCommit
306          *
307          */
308         private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
309             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
310             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
311             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
312                 ops.add(cohort.commit());
313             }
314             /*
315              * We are returing all futures as list, not only succeeded ones in
316              * order to fail composite future if any of them failed.
317              * See Futures.allAsList for this description.
318              */
319             @SuppressWarnings({ "unchecked", "rawtypes" })
320             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
321             return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
322         }
323
324         /**
325          *
326          * Invokes canCommit on underlying cohorts and returns composite future
327          * which will contains {@link Boolean#TRUE} only and only if
328          * all cohorts returned true.
329          *
330          * Valid state transition is from SUBMITTED to CAN_COMMIT,
331          * if currentPhase is not SUBMITTED throws IllegalStateException.
332          *
333          * @return Future which will complete once all cohorts completed
334          *         preCommit.
335          *         Future throws TransactionCommitFailedException
336          *         If any of cohorts failed preCommit
337          *
338          */
339         private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
340             changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
341             Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
342             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
343                 canCommitOperations.add(cohort.canCommit());
344             }
345             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
346             ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
347             return Futures
348                     .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
349
350         }
351
352         /**
353          *
354          * Invokes abort on underlying cohorts and returns future which
355          * completes
356          * once all abort on cohorts are completed.
357          *
358          * @return Future which will complete once all cohorts completed
359          *         abort.
360          *
361          */
362         private ListenableFuture<Void> abortAsyncAll() {
363             changeStateFrom(currentPhase, CommitPhase.ABORT);
364             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
365             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
366                 ops.add(cohort.abort());
367             }
368             /*
369              * We are returing all futures as list, not only succeeded ones in
370              * order to fail composite future if any of them failed.
371              * See Futures.allAsList for this description.
372              */
373             @SuppressWarnings({ "unchecked", "rawtypes" })
374             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
375             return compositeResult;
376         }
377
378         /**
379          * Change phase / state of transaction from expected value to new value
380          *
381          * This method checks state and updates state to new state of
382          * of this task if current state equals expected state.
383          * If expected state and current state are different raises
384          * IllegalStateException
385          * which means there is probably bug in implementation of commit
386          * coordination.
387          *
388          * If transition is successful, it logs transition on DEBUG level.
389          *
390          * @param currentExpected
391          *            Required phase for change of state
392          * @param newState
393          *            New Phase which will be entered by transaction.
394          * @throws IllegalStateException
395          *             If currentState of task does not match expected state
396          */
397         private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
398             Preconditions.checkState(currentPhase.equals(currentExpected),
399                     "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
400                     currentPhase, newState);
401             LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
402             currentPhase = newState;
403         };
404
405     }
406
407 }