Bug#1854 - Exit command in console causing OOM.
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import java.util.List;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.RejectedExecutionException;
13
14 import javax.annotation.concurrent.GuardedBy;
15
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
19 import org.opendaylight.yangtools.util.DurationStatsTracker;
20 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Throwables;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableList.Builder;
30 import com.google.common.util.concurrent.CheckedFuture;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.ListeningExecutorService;
34
35 /**
36  *
37  * Implementation of blocking three phase commit coordinator, which which
38  * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
39  *
40  * This implementation does not support cancelation of commit,
41  *
42  * In order to advance to next phase of three phase commit all subtasks of
43  * previous step must be finish.
44  *
45  * This executor does not have an upper bound on subtask timeout.
46  *
47  *
48  */
49 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
50
51     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
52
53     /**
54      * Runs AND binary operation between all booleans in supplied iteration of booleans.
55      *
56      * This method will stop evaluating iterables if first found is false.
57      */
58     private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
59
60         @Override
61         public Boolean apply(final Iterable<Boolean> input) {
62             for(boolean value : input) {
63                if(!value) {
64                    return Boolean.FALSE;
65                }
66             }
67             return Boolean.TRUE;
68         }
69     };
70
71     private final ListeningExecutorService executor;
72
73     private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
74
75     /**
76      *
77      * Construct DOMDataCommitCoordinator which uses supplied executor to
78      * process commit coordinations.
79      *
80      * @param executor
81      */
82     public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
83         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
84     }
85
86     public DurationStatsTracker getCommitStatsTracker() {
87         return commitStatsTracker;
88     }
89
90     @Override
91     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
92             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
93         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
94         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
95         Preconditions.checkArgument(listener != null, "Listener must not be null");
96         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
97
98         ListenableFuture<Void> commitFuture = null;
99         try {
100             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
101                     listener, commitStatsTracker));
102         } catch(RejectedExecutionException e) {
103             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
104                       executor, e);
105             return Futures.immediateFailedCheckedFuture(
106                     new TransactionCommitFailedException(
107                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
108         }
109
110         if (listener.isPresent()) {
111             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
112         }
113
114         return MappingCheckedFuture.create(commitFuture,
115                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
116     }
117
118     /**
119      *
120      * Phase of 3PC commit
121      *
122      * Represents phase of 3PC Commit
123      *
124      *
125      */
126     private static enum CommitPhase {
127         /**
128          *
129          * Commit Coordination Task is submitted for executing
130          *
131          */
132         SUBMITTED,
133         /**
134          * Commit Coordination Task is in can commit phase of 3PC
135          *
136          */
137         CAN_COMMIT,
138         /**
139          * Commit Coordination Task is in pre-commit phase of 3PC
140          *
141          */
142         PRE_COMMIT,
143         /**
144          * Commit Coordination Task is in commit phase of 3PC
145          *
146          */
147         COMMIT,
148         /**
149          * Commit Coordination Task is in abort phase of 3PC
150          *
151          */
152         ABORT
153     }
154
155     /**
156      *
157      * Implementation of blocking three-phase commit-coordination tasks without
158      * support of cancelation.
159      *
160      */
161     private static class CommitCoordinationTask implements Callable<Void> {
162
163         private final DOMDataWriteTransaction tx;
164         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
165         private final DurationStatsTracker commitStatTracker;
166
167         @GuardedBy("this")
168         private CommitPhase currentPhase;
169
170         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
171                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
172                 final Optional<DOMDataCommitErrorListener> listener,
173                 final DurationStatsTracker commitStatTracker) {
174             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
175             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
176             this.currentPhase = CommitPhase.SUBMITTED;
177             this.commitStatTracker = commitStatTracker;
178         }
179
180         @Override
181         public Void call() throws TransactionCommitFailedException {
182
183             long startTime = System.nanoTime();
184             try {
185                 canCommitBlocking();
186                 preCommitBlocking();
187                 commitBlocking();
188                 return null;
189             } catch (TransactionCommitFailedException e) {
190                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
191                 abortBlocking(e);
192                 throw e;
193             } finally {
194                 if(commitStatTracker != null) {
195                     commitStatTracker.addDuration(System.nanoTime() - startTime);
196                 }
197             }
198         }
199
200         /**
201          *
202          * Invokes canCommit on underlying cohorts and blocks till
203          * all results are returned.
204          *
205          * Valid state transition is from SUBMITTED to CAN_COMMIT,
206          * if currentPhase is not SUBMITTED throws IllegalStateException.
207          *
208          * @throws TransactionCommitFailedException
209          *             If one of cohorts failed can Commit
210          *
211          */
212         private void canCommitBlocking() throws TransactionCommitFailedException {
213             final Boolean canCommitResult = canCommitAll().checkedGet();
214             if (!canCommitResult) {
215                 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
216             }
217         }
218
219         /**
220          *
221          * Invokes preCommit on underlying cohorts and blocks till
222          * all results are returned.
223          *
224          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
225          * state is not CAN_COMMIT
226          * throws IllegalStateException.
227          *
228          * @throws TransactionCommitFailedException
229          *             If one of cohorts failed preCommit
230          *
231          */
232         private void preCommitBlocking() throws TransactionCommitFailedException {
233             preCommitAll().checkedGet();
234         }
235
236         /**
237          *
238          * Invokes commit on underlying cohorts and blocks till
239          * all results are returned.
240          *
241          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
242          * IllegalStateException.
243          *
244          * @throws TransactionCommitFailedException
245          *             If one of cohorts failed preCommit
246          *
247          */
248         private void commitBlocking() throws TransactionCommitFailedException {
249             commitAll().checkedGet();
250         }
251
252         /**
253          * Aborts transaction.
254          *
255          * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
256          * cohorts, blocks
257          * for all results. If any of the abort failed throws
258          * IllegalStateException,
259          * which will contains originalCause as suppressed Exception.
260          *
261          * If aborts we're successful throws supplied exception
262          *
263          * @param originalCause
264          *            Exception which should be used to fail transaction for
265          *            consumers of transaction
266          *            future and listeners of transaction failure.
267          * @throws TransactionCommitFailedException
268          *             on invocation of this method.
269          *             originalCa
270          * @throws IllegalStateException
271          *             if abort failed.
272          */
273         private void abortBlocking(final TransactionCommitFailedException originalCause)
274                 throws TransactionCommitFailedException {
275             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
276             Exception cause = originalCause;
277             try {
278                 abortAsyncAll().get();
279             } catch (InterruptedException | ExecutionException e) {
280                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
281                 cause = new IllegalStateException("Abort failed.", e);
282                 cause.addSuppressed(e);
283             }
284             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
285         }
286
287         /**
288          *
289          * Invokes preCommit on underlying cohorts and returns future
290          * which will complete once all preCommit on cohorts completed or
291          * failed.
292          *
293          *
294          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
295          * state is not CAN_COMMIT
296          * throws IllegalStateException.
297          *
298          * @return Future which will complete once all cohorts completed
299          *         preCommit.
300          *         Future throws TransactionCommitFailedException
301          *         If any of cohorts failed preCommit
302          *
303          */
304         private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
305             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
306             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
307             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
308                 ops.add(cohort.preCommit());
309             }
310             /*
311              * We are returing all futures as list, not only succeeded ones in
312              * order to fail composite future if any of them failed.
313              * See Futures.allAsList for this description.
314              */
315             @SuppressWarnings({ "unchecked", "rawtypes" })
316             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
317             return MappingCheckedFuture.create(compositeResult,
318                                          TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
319         }
320
321         /**
322          *
323          * Invokes commit on underlying cohorts and returns future which
324          * completes
325          * once all commits on cohorts are completed.
326          *
327          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
328          * IllegalStateException
329          *
330          * @return Future which will complete once all cohorts completed
331          *         commit.
332          *         Future throws TransactionCommitFailedException
333          *         If any of cohorts failed preCommit
334          *
335          */
336         private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
337             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
338             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
339             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
340                 ops.add(cohort.commit());
341             }
342             /*
343              * We are returing all futures as list, not only succeeded ones in
344              * order to fail composite future if any of them failed.
345              * See Futures.allAsList for this description.
346              */
347             @SuppressWarnings({ "unchecked", "rawtypes" })
348             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
349             return MappingCheckedFuture.create(compositeResult,
350                                      TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
351         }
352
353         /**
354          *
355          * Invokes canCommit on underlying cohorts and returns composite future
356          * which will contains {@link Boolean#TRUE} only and only if
357          * all cohorts returned true.
358          *
359          * Valid state transition is from SUBMITTED to CAN_COMMIT,
360          * if currentPhase is not SUBMITTED throws IllegalStateException.
361          *
362          * @return Future which will complete once all cohorts completed
363          *         preCommit.
364          *         Future throws TransactionCommitFailedException
365          *         If any of cohorts failed preCommit
366          *
367          */
368         private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
369             changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
370             Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
371             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
372                 canCommitOperations.add(cohort.canCommit());
373             }
374             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
375             ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
376             return MappingCheckedFuture.create(allSuccessFuture,
377                                        TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
378
379         }
380
381         /**
382          *
383          * Invokes abort on underlying cohorts and returns future which
384          * completes
385          * once all abort on cohorts are completed.
386          *
387          * @return Future which will complete once all cohorts completed
388          *         abort.
389          *
390          */
391         private ListenableFuture<Void> abortAsyncAll() {
392             changeStateFrom(currentPhase, CommitPhase.ABORT);
393             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
394             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
395                 ops.add(cohort.abort());
396             }
397             /*
398              * We are returing all futures as list, not only succeeded ones in
399              * order to fail composite future if any of them failed.
400              * See Futures.allAsList for this description.
401              */
402             @SuppressWarnings({ "unchecked", "rawtypes" })
403             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
404             return compositeResult;
405         }
406
407         /**
408          * Change phase / state of transaction from expected value to new value
409          *
410          * This method checks state and updates state to new state of
411          * of this task if current state equals expected state.
412          * If expected state and current state are different raises
413          * IllegalStateException
414          * which means there is probably bug in implementation of commit
415          * coordination.
416          *
417          * If transition is successful, it logs transition on DEBUG level.
418          *
419          * @param currentExpected
420          *            Required phase for change of state
421          * @param newState
422          *            New Phase which will be entered by transaction.
423          * @throws IllegalStateException
424          *             If currentState of task does not match expected state
425          */
426         private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
427             Preconditions.checkState(currentPhase.equals(currentExpected),
428                     "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
429                     currentPhase, newState);
430             LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
431             currentPhase = newState;
432         };
433
434     }
435
436 }