Merge "Bug#1854 - Exit command in console causing OOM."
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import com.google.common.base.Optional;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Throwables;
12 import com.google.common.collect.Iterables;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
22 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.yangtools.util.DurationStatsTracker;
25 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  *
31  * Implementation of blocking three phase commit coordinator, which which
32  * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
33  *
34  * This implementation does not support cancelation of commit,
35  *
36  * In order to advance to next phase of three phase commit all subtasks of
37  * previous step must be finish.
38  *
39  * This executor does not have an upper bound on subtask timeout.
40  *
41  *
42  */
43 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
44
45     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
46     private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
47     private final ListeningExecutorService executor;
48
49     /**
50      *
51      * Construct DOMDataCommitCoordinator which uses supplied executor to
52      * process commit coordinations.
53      *
54      * @param executor
55      */
56     public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
57         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
58     }
59
60     public DurationStatsTracker getCommitStatsTracker() {
61         return commitStatsTracker;
62     }
63
64     @Override
65     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
66             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
67         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
68         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
69         Preconditions.checkArgument(listener != null, "Listener must not be null");
70         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
71
72         ListenableFuture<Void> commitFuture = null;
73         try {
74             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
75                     listener, commitStatsTracker));
76         } catch(RejectedExecutionException e) {
77             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
78                       executor, e);
79             return Futures.immediateFailedCheckedFuture(
80                     new TransactionCommitFailedException(
81                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
82         }
83
84         if (listener.isPresent()) {
85             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
86         }
87
88         return MappingCheckedFuture.create(commitFuture,
89                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
90     }
91
92     /**
93      *
94      * Phase of 3PC commit
95      *
96      * Represents phase of 3PC Commit
97      *
98      *
99      */
100     private static enum CommitPhase {
101         /**
102          *
103          * Commit Coordination Task is submitted for executing
104          *
105          */
106         SUBMITTED,
107         /**
108          * Commit Coordination Task is in can commit phase of 3PC
109          *
110          */
111         CAN_COMMIT,
112         /**
113          * Commit Coordination Task is in pre-commit phase of 3PC
114          *
115          */
116         PRE_COMMIT,
117         /**
118          * Commit Coordination Task is in commit phase of 3PC
119          *
120          */
121         COMMIT,
122         /**
123          * Commit Coordination Task is in abort phase of 3PC
124          *
125          */
126         ABORT
127     }
128
129     /**
130      * Implementation of blocking three-phase commit-coordination tasks without
131      * support of cancellation.
132      */
133     private static final class CommitCoordinationTask implements Callable<Void> {
134         private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
135                 AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
136         private final DOMDataWriteTransaction tx;
137         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
138         private final DurationStatsTracker commitStatTracker;
139         private final int cohortSize;
140         private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
141
142         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
143                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
144                 final Optional<DOMDataCommitErrorListener> listener,
145                 final DurationStatsTracker commitStatTracker) {
146             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
147             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
148             this.commitStatTracker = commitStatTracker;
149             this.cohortSize = Iterables.size(cohorts);
150         }
151
152         @Override
153         public Void call() throws TransactionCommitFailedException {
154             final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
155
156             try {
157                 canCommitBlocking();
158                 preCommitBlocking();
159                 commitBlocking();
160                 return null;
161             } catch (TransactionCommitFailedException e) {
162                 final CommitPhase phase = currentPhase;
163                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
164                 abortBlocking(e, phase);
165                 throw e;
166             } finally {
167                 if (commitStatTracker != null) {
168                     commitStatTracker.addDuration(System.nanoTime() - startTime);
169                 }
170             }
171         }
172
173         /**
174          *
175          * Invokes canCommit on underlying cohorts and blocks till
176          * all results are returned.
177          *
178          * Valid state transition is from SUBMITTED to CAN_COMMIT,
179          * if currentPhase is not SUBMITTED throws IllegalStateException.
180          *
181          * @throws TransactionCommitFailedException
182          *             If one of cohorts failed can Commit
183          *
184          */
185         private void canCommitBlocking() throws TransactionCommitFailedException {
186             for (ListenableFuture<?> canCommit : canCommitAll()) {
187                 try {
188                     final Boolean result = (Boolean)canCommit.get();
189                     if (result == null || !result) {
190                         throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
191                     }
192                 } catch (InterruptedException | ExecutionException e) {
193                     throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
194                 }
195             }
196         }
197
198         /**
199          *
200          * Invokes canCommit on underlying cohorts and returns composite future
201          * which will contains {@link Boolean#TRUE} only and only if
202          * all cohorts returned true.
203          *
204          * Valid state transition is from SUBMITTED to CAN_COMMIT,
205          * if currentPhase is not SUBMITTED throws IllegalStateException.
206          *
207          * @return List of all cohorts futures from can commit phase.
208          *
209          */
210         private ListenableFuture<?>[] canCommitAll() {
211             changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
212
213             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
214             int i = 0;
215             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
216                 ops[i++] = cohort.canCommit();
217             }
218             return ops;
219         }
220
221         /**
222          *
223          * Invokes preCommit on underlying cohorts and blocks till
224          * all results are returned.
225          *
226          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
227          * state is not CAN_COMMIT
228          * throws IllegalStateException.
229          *
230          * @throws TransactionCommitFailedException
231          *             If one of cohorts failed preCommit
232          *
233          */
234         private void preCommitBlocking() throws TransactionCommitFailedException {
235             final ListenableFuture<?>[] preCommitFutures = preCommitAll();
236             try {
237                 for(ListenableFuture<?> future : preCommitFutures) {
238                     future.get();
239                 }
240             } catch (InterruptedException | ExecutionException e) {
241                 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
242             }
243         }
244
245         /**
246          *
247          * Invokes preCommit on underlying cohorts and returns future
248          * which will complete once all preCommit on cohorts completed or
249          * failed.
250          *
251          *
252          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
253          * state is not CAN_COMMIT
254          * throws IllegalStateException.
255          *
256          * @return List of all cohorts futures from can commit phase.
257          *
258          */
259         private ListenableFuture<?>[] preCommitAll() {
260             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
261
262             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
263             int i = 0;
264             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
265                 ops[i++] = cohort.preCommit();
266             }
267             return ops;
268         }
269
270         /**
271          *
272          * Invokes commit on underlying cohorts and blocks till
273          * all results are returned.
274          *
275          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
276          * IllegalStateException.
277          *
278          * @throws TransactionCommitFailedException
279          *             If one of cohorts failed preCommit
280          *
281          */
282         private void commitBlocking() throws TransactionCommitFailedException {
283             final ListenableFuture<?>[] commitFutures = commitAll();
284             try {
285                 for(ListenableFuture<?> future : commitFutures) {
286                     future.get();
287                 }
288             } catch (InterruptedException | ExecutionException e) {
289                 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
290             }
291         }
292
293         /**
294          *
295          * Invokes commit on underlying cohorts and returns future which
296          * completes
297          * once all commits on cohorts are completed.
298          *
299          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
300          * IllegalStateException
301          *
302          * @return List of all cohorts futures from can commit phase.
303          *
304          */
305         private ListenableFuture<?>[] commitAll() {
306             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
307
308             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
309             int i = 0;
310             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
311                 ops[i++] = cohort.commit();
312             }
313             return ops;
314         }
315
316         /**
317          * Aborts transaction.
318          *
319          * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
320          * cohorts, blocks
321          * for all results. If any of the abort failed throws
322          * IllegalStateException,
323          * which will contains originalCause as suppressed Exception.
324          *
325          * If aborts we're successful throws supplied exception
326          *
327          * @param originalCause
328          *            Exception which should be used to fail transaction for
329          *            consumers of transaction
330          *            future and listeners of transaction failure.
331          * @param phase phase in which the problem ensued
332          * @throws TransactionCommitFailedException
333          *             on invocation of this method.
334          *             originalCa
335          * @throws IllegalStateException
336          *             if abort failed.
337          */
338         private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
339                 throws TransactionCommitFailedException {
340             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
341             Exception cause = originalCause;
342             try {
343                 abortAsyncAll(phase).get();
344             } catch (InterruptedException | ExecutionException e) {
345                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
346                 cause = new IllegalStateException("Abort failed.", e);
347                 cause.addSuppressed(e);
348             }
349             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
350         }
351
352         /**
353          * Invokes abort on underlying cohorts and returns future which
354          * completes once all abort on cohorts are completed.
355          *
356          * @param phase phase in which the problem ensued
357          * @return Future which will complete once all cohorts completed
358          *         abort.
359          */
360         private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
361             changeStateFrom(phase, CommitPhase.ABORT);
362
363             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
364             int i = 0;
365             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
366                 ops[i++] = cohort.abort();
367             }
368
369             /*
370              * We are returning all futures as list, not only succeeded ones in
371              * order to fail composite future if any of them failed.
372              * See Futures.allAsList for this description.
373              */
374             @SuppressWarnings({ "unchecked", "rawtypes" })
375             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
376             return compositeResult;
377         }
378
379         /**
380          * Change phase / state of transaction from expected value to new value
381          *
382          * This method checks state and updates state to new state of
383          * of this task if current state equals expected state.
384          * If expected state and current state are different raises
385          * IllegalStateException
386          * which means there is probably bug in implementation of commit
387          * coordination.
388          *
389          * If transition is successful, it logs transition on DEBUG level.
390          *
391          * @param currentExpected
392          *            Required phase for change of state
393          * @param newState
394          *            New Phase which will be entered by transaction.
395          * @throws IllegalStateException
396          *             If currentState of task does not match expected state
397          */
398         private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
399             final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
400             Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
401                 tx.getIdentifier(), currentExpected, currentPhase, newState);
402
403             LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
404         };
405     }
406
407 }