Bug 1430: Off-load notifications from single commit thread
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import java.util.List;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.RejectedExecutionException;
13
14 import javax.annotation.concurrent.GuardedBy;
15
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
19 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableList;
28 import com.google.common.collect.ImmutableList.Builder;
29 import com.google.common.util.concurrent.CheckedFuture;
30 import com.google.common.util.concurrent.Futures;
31 import com.google.common.util.concurrent.ListenableFuture;
32 import com.google.common.util.concurrent.ListeningExecutorService;
33
34 /**
35  *
36  * Implementation of blocking three phase commit coordinator, which which
37  * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
38  *
39  * This implementation does not support cancelation of commit,
40  *
41  * In order to advance to next phase of three phase commit all subtasks of
42  * previous step must be finish.
43  *
44  * This executor does not have an upper bound on subtask timeout.
45  *
46  *
47  */
48 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
49
50     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
51
52     /**
53      * Runs AND binary operation between all booleans in supplied iteration of booleans.
54      *
55      * This method will stop evaluating iterables if first found is false.
56      */
57     private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
58
59         @Override
60         public Boolean apply(final Iterable<Boolean> input) {
61             for(boolean value : input) {
62                if(!value) {
63                    return Boolean.FALSE;
64                }
65             }
66             return Boolean.TRUE;
67         }
68     };
69
70     private final ListeningExecutorService executor;
71
72     /**
73      *
74      * Construct DOMDataCommitCoordinator which uses supplied executor to
75      * process commit coordinations.
76      *
77      * @param executor
78      */
79     public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
80         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
81     }
82
83     @Override
84     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
85             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
86         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
87         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
88         Preconditions.checkArgument(listener != null, "Listener must not be null");
89         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
90
91         ListenableFuture<Void> commitFuture = null;
92         try {
93             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
94         } catch(RejectedExecutionException e) {
95             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
96                       executor, e);
97             return Futures.immediateFailedCheckedFuture(
98                     new TransactionCommitFailedException(
99                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
100         }
101
102         if (listener.isPresent()) {
103             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
104         }
105
106         return MappingCheckedFuture.create(commitFuture,
107                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
108     }
109
110     /**
111      *
112      * Phase of 3PC commit
113      *
114      * Represents phase of 3PC Commit
115      *
116      *
117      */
118     private static enum CommitPhase {
119         /**
120          *
121          * Commit Coordination Task is submitted for executing
122          *
123          */
124         SUBMITTED,
125         /**
126          * Commit Coordination Task is in can commit phase of 3PC
127          *
128          */
129         CAN_COMMIT,
130         /**
131          * Commit Coordination Task is in pre-commit phase of 3PC
132          *
133          */
134         PRE_COMMIT,
135         /**
136          * Commit Coordination Task is in commit phase of 3PC
137          *
138          */
139         COMMIT,
140         /**
141          * Commit Coordination Task is in abort phase of 3PC
142          *
143          */
144         ABORT
145     }
146
147     /**
148      *
149      * Implementation of blocking three-phase commit-coordination tasks without
150      * support of cancelation.
151      *
152      */
153     private static class CommitCoordinationTask implements Callable<Void> {
154
155         private final DOMDataWriteTransaction tx;
156         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
157
158         @GuardedBy("this")
159         private CommitPhase currentPhase;
160
161         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
162                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
163                 final Optional<DOMDataCommitErrorListener> listener) {
164             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
165             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
166             this.currentPhase = CommitPhase.SUBMITTED;
167         }
168
169         @Override
170         public Void call() throws TransactionCommitFailedException {
171
172             try {
173                 canCommitBlocking();
174                 preCommitBlocking();
175                 commitBlocking();
176                 return null;
177             } catch (TransactionCommitFailedException e) {
178                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
179                 abortBlocking(e);
180                 throw e;
181             }
182         }
183
184         /**
185          *
186          * Invokes canCommit on underlying cohorts and blocks till
187          * all results are returned.
188          *
189          * Valid state transition is from SUBMITTED to CAN_COMMIT,
190          * if currentPhase is not SUBMITTED throws IllegalStateException.
191          *
192          * @throws TransactionCommitFailedException
193          *             If one of cohorts failed can Commit
194          *
195          */
196         private void canCommitBlocking() throws TransactionCommitFailedException {
197             final Boolean canCommitResult = canCommitAll().checkedGet();
198             if (!canCommitResult) {
199                 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
200             }
201         }
202
203         /**
204          *
205          * Invokes preCommit on underlying cohorts and blocks till
206          * all results are returned.
207          *
208          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
209          * state is not CAN_COMMIT
210          * throws IllegalStateException.
211          *
212          * @throws TransactionCommitFailedException
213          *             If one of cohorts failed preCommit
214          *
215          */
216         private void preCommitBlocking() throws TransactionCommitFailedException {
217             preCommitAll().checkedGet();
218         }
219
220         /**
221          *
222          * Invokes commit on underlying cohorts and blocks till
223          * all results are returned.
224          *
225          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
226          * IllegalStateException.
227          *
228          * @throws TransactionCommitFailedException
229          *             If one of cohorts failed preCommit
230          *
231          */
232         private void commitBlocking() throws TransactionCommitFailedException {
233             commitAll().checkedGet();
234         }
235
236         /**
237          * Aborts transaction.
238          *
239          * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
240          * cohorts, blocks
241          * for all results. If any of the abort failed throws
242          * IllegalStateException,
243          * which will contains originalCause as suppressed Exception.
244          *
245          * If aborts we're successful throws supplied exception
246          *
247          * @param originalCause
248          *            Exception which should be used to fail transaction for
249          *            consumers of transaction
250          *            future and listeners of transaction failure.
251          * @throws TransactionCommitFailedException
252          *             on invocation of this method.
253          *             originalCa
254          * @throws IllegalStateException
255          *             if abort failed.
256          */
257         private void abortBlocking(final TransactionCommitFailedException originalCause)
258                 throws TransactionCommitFailedException {
259             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
260             Exception cause = originalCause;
261             try {
262                 abortAsyncAll().get();
263             } catch (InterruptedException | ExecutionException e) {
264                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
265                 cause = new IllegalStateException("Abort failed.", e);
266                 cause.addSuppressed(e);
267             }
268             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
269         }
270
271         /**
272          *
273          * Invokes preCommit on underlying cohorts and returns future
274          * which will complete once all preCommit on cohorts completed or
275          * failed.
276          *
277          *
278          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
279          * state is not CAN_COMMIT
280          * throws IllegalStateException.
281          *
282          * @return Future which will complete once all cohorts completed
283          *         preCommit.
284          *         Future throws TransactionCommitFailedException
285          *         If any of cohorts failed preCommit
286          *
287          */
288         private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
289             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
290             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
291             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
292                 ops.add(cohort.preCommit());
293             }
294             /*
295              * We are returing all futures as list, not only succeeded ones in
296              * order to fail composite future if any of them failed.
297              * See Futures.allAsList for this description.
298              */
299             @SuppressWarnings({ "unchecked", "rawtypes" })
300             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
301             return MappingCheckedFuture.create(compositeResult,
302                                          TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
303         }
304
305         /**
306          *
307          * Invokes commit on underlying cohorts and returns future which
308          * completes
309          * once all commits on cohorts are completed.
310          *
311          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
312          * IllegalStateException
313          *
314          * @return Future which will complete once all cohorts completed
315          *         commit.
316          *         Future throws TransactionCommitFailedException
317          *         If any of cohorts failed preCommit
318          *
319          */
320         private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
321             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
322             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
323             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
324                 ops.add(cohort.commit());
325             }
326             /*
327              * We are returing all futures as list, not only succeeded ones in
328              * order to fail composite future if any of them failed.
329              * See Futures.allAsList for this description.
330              */
331             @SuppressWarnings({ "unchecked", "rawtypes" })
332             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
333             return MappingCheckedFuture.create(compositeResult,
334                                      TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
335         }
336
337         /**
338          *
339          * Invokes canCommit on underlying cohorts and returns composite future
340          * which will contains {@link Boolean#TRUE} only and only if
341          * all cohorts returned true.
342          *
343          * Valid state transition is from SUBMITTED to CAN_COMMIT,
344          * if currentPhase is not SUBMITTED throws IllegalStateException.
345          *
346          * @return Future which will complete once all cohorts completed
347          *         preCommit.
348          *         Future throws TransactionCommitFailedException
349          *         If any of cohorts failed preCommit
350          *
351          */
352         private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
353             changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
354             Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
355             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
356                 canCommitOperations.add(cohort.canCommit());
357             }
358             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
359             ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
360             return MappingCheckedFuture.create(allSuccessFuture,
361                                        TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
362
363         }
364
365         /**
366          *
367          * Invokes abort on underlying cohorts and returns future which
368          * completes
369          * once all abort on cohorts are completed.
370          *
371          * @return Future which will complete once all cohorts completed
372          *         abort.
373          *
374          */
375         private ListenableFuture<Void> abortAsyncAll() {
376             changeStateFrom(currentPhase, CommitPhase.ABORT);
377             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
378             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
379                 ops.add(cohort.abort());
380             }
381             /*
382              * We are returing all futures as list, not only succeeded ones in
383              * order to fail composite future if any of them failed.
384              * See Futures.allAsList for this description.
385              */
386             @SuppressWarnings({ "unchecked", "rawtypes" })
387             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
388             return compositeResult;
389         }
390
391         /**
392          * Change phase / state of transaction from expected value to new value
393          *
394          * This method checks state and updates state to new state of
395          * of this task if current state equals expected state.
396          * If expected state and current state are different raises
397          * IllegalStateException
398          * which means there is probably bug in implementation of commit
399          * coordination.
400          *
401          * If transition is successful, it logs transition on DEBUG level.
402          *
403          * @param currentExpected
404          *            Required phase for change of state
405          * @param newState
406          *            New Phase which will be entered by transaction.
407          * @throws IllegalStateException
408          *             If currentState of task does not match expected state
409          */
410         private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
411             Preconditions.checkState(currentPhase.equals(currentExpected),
412                     "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
413                     currentPhase, newState);
414             LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
415             currentPhase = newState;
416         };
417
418     }
419
420 }