Bug 650: Removed uncessary wrapping and allocation of futures
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import com.google.common.base.Optional;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Throwables;
12 import com.google.common.collect.Iterables;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.RejectedExecutionException;
20 import javax.annotation.concurrent.GuardedBy;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
22 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.yangtools.util.DurationStatsTracker;
25 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  *
31  * Implementation of blocking three phase commit coordinator, which which
32  * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
33  *
34  * This implementation does not support cancelation of commit,
35  *
36  * In order to advance to next phase of three phase commit all subtasks of
37  * previous step must be finish.
38  *
39  * This executor does not have an upper bound on subtask timeout.
40  *
41  *
42  */
43 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
44
45     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
46     private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
47     private final ListeningExecutorService executor;
48
49     /**
50      *
51      * Construct DOMDataCommitCoordinator which uses supplied executor to
52      * process commit coordinations.
53      *
54      * @param executor
55      */
56     public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
57         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
58     }
59
60     public DurationStatsTracker getCommitStatsTracker() {
61         return commitStatsTracker;
62     }
63
64     @Override
65     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
66             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
67         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
68         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
69         Preconditions.checkArgument(listener != null, "Listener must not be null");
70         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
71
72         ListenableFuture<Void> commitFuture = null;
73         try {
74             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
75                     listener, commitStatsTracker));
76         } catch(RejectedExecutionException e) {
77             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
78                       executor, e);
79             return Futures.immediateFailedCheckedFuture(
80                     new TransactionCommitFailedException(
81                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
82         }
83
84         if (listener.isPresent()) {
85             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
86         }
87
88         return MappingCheckedFuture.create(commitFuture,
89                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
90     }
91
92     /**
93      *
94      * Phase of 3PC commit
95      *
96      * Represents phase of 3PC Commit
97      *
98      *
99      */
100     private static enum CommitPhase {
101         /**
102          *
103          * Commit Coordination Task is submitted for executing
104          *
105          */
106         SUBMITTED,
107         /**
108          * Commit Coordination Task is in can commit phase of 3PC
109          *
110          */
111         CAN_COMMIT,
112         /**
113          * Commit Coordination Task is in pre-commit phase of 3PC
114          *
115          */
116         PRE_COMMIT,
117         /**
118          * Commit Coordination Task is in commit phase of 3PC
119          *
120          */
121         COMMIT,
122         /**
123          * Commit Coordination Task is in abort phase of 3PC
124          *
125          */
126         ABORT
127     }
128
129     /**
130      *
131      * Implementation of blocking three-phase commit-coordination tasks without
132      * support of cancelation.
133      *
134      */
135     private static class CommitCoordinationTask implements Callable<Void> {
136
137         private final DOMDataWriteTransaction tx;
138         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
139         private final DurationStatsTracker commitStatTracker;
140         private final int cohortSize;
141
142         @GuardedBy("this")
143         private CommitPhase currentPhase;
144
145         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
146                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
147                 final Optional<DOMDataCommitErrorListener> listener,
148                 final DurationStatsTracker commitStatTracker) {
149             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
150             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
151             this.currentPhase = CommitPhase.SUBMITTED;
152             this.commitStatTracker = commitStatTracker;
153             this.cohortSize = Iterables.size(cohorts);
154         }
155
156         @Override
157         public Void call() throws TransactionCommitFailedException {
158
159             long startTime = System.nanoTime();
160             try {
161                 canCommitBlocking();
162                 preCommitBlocking();
163                 commitBlocking();
164                 return null;
165             } catch (TransactionCommitFailedException e) {
166                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
167                 abortBlocking(e);
168                 throw e;
169             } finally {
170                 if(commitStatTracker != null) {
171                     commitStatTracker.addDuration(System.nanoTime() - startTime);
172                 }
173             }
174         }
175
176         /**
177          *
178          * Invokes canCommit on underlying cohorts and blocks till
179          * all results are returned.
180          *
181          * Valid state transition is from SUBMITTED to CAN_COMMIT,
182          * if currentPhase is not SUBMITTED throws IllegalStateException.
183          *
184          * @throws TransactionCommitFailedException
185          *             If one of cohorts failed can Commit
186          *
187          */
188         private void canCommitBlocking() throws TransactionCommitFailedException {
189             for (ListenableFuture<?> canCommit : canCommitAll()) {
190                 try {
191                     final Boolean result = (Boolean)canCommit.get();
192                     if (result == null || !result) {
193                         throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
194                     }
195                 } catch (InterruptedException | ExecutionException e) {
196                     throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
197                 }
198             }
199         }
200
201         /**
202          *
203          * Invokes canCommit on underlying cohorts and returns composite future
204          * which will contains {@link Boolean#TRUE} only and only if
205          * all cohorts returned true.
206          *
207          * Valid state transition is from SUBMITTED to CAN_COMMIT,
208          * if currentPhase is not SUBMITTED throws IllegalStateException.
209          *
210          * @return List of all cohorts futures from can commit phase.
211          *
212          */
213         private ListenableFuture<?>[] canCommitAll() {
214             changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
215
216             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
217             int i = 0;
218             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
219                 ops[i++] = cohort.canCommit();
220             }
221             return ops;
222         }
223
224         /**
225          *
226          * Invokes preCommit on underlying cohorts and blocks till
227          * all results are returned.
228          *
229          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
230          * state is not CAN_COMMIT
231          * throws IllegalStateException.
232          *
233          * @throws TransactionCommitFailedException
234          *             If one of cohorts failed preCommit
235          *
236          */
237         private void preCommitBlocking() throws TransactionCommitFailedException {
238             final ListenableFuture<?>[] preCommitFutures = preCommitAll();
239             try {
240                 for(ListenableFuture<?> future : preCommitFutures) {
241                     future.get();
242                 }
243             } catch (InterruptedException | ExecutionException e) {
244                 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
245             }
246         }
247
248         /**
249          *
250          * Invokes preCommit on underlying cohorts and returns future
251          * which will complete once all preCommit on cohorts completed or
252          * failed.
253          *
254          *
255          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
256          * state is not CAN_COMMIT
257          * throws IllegalStateException.
258          *
259          * @return List of all cohorts futures from can commit phase.
260          *
261          */
262         private ListenableFuture<?>[] preCommitAll() {
263             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
264
265             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
266             int i = 0;
267             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
268                 ops[i++] = cohort.preCommit();
269             }
270             return ops;
271         }
272
273         /**
274          *
275          * Invokes commit on underlying cohorts and blocks till
276          * all results are returned.
277          *
278          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
279          * IllegalStateException.
280          *
281          * @throws TransactionCommitFailedException
282          *             If one of cohorts failed preCommit
283          *
284          */
285         private void commitBlocking() throws TransactionCommitFailedException {
286             final ListenableFuture<?>[] commitFutures = commitAll();
287             try {
288                 for(ListenableFuture<?> future : commitFutures) {
289                     future.get();
290                 }
291             } catch (InterruptedException | ExecutionException e) {
292                 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
293             }
294         }
295
296         /**
297          *
298          * Invokes commit on underlying cohorts and returns future which
299          * completes
300          * once all commits on cohorts are completed.
301          *
302          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
303          * IllegalStateException
304          *
305          * @return List of all cohorts futures from can commit phase.
306          *
307          */
308         private ListenableFuture<?>[] commitAll() {
309             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
310
311             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
312             int i = 0;
313             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
314                 ops[i++] = cohort.commit();
315             }
316             return ops;
317         }
318
319         /**
320          * Aborts transaction.
321          *
322          * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
323          * cohorts, blocks
324          * for all results. If any of the abort failed throws
325          * IllegalStateException,
326          * which will contains originalCause as suppressed Exception.
327          *
328          * If aborts we're successful throws supplied exception
329          *
330          * @param originalCause
331          *            Exception which should be used to fail transaction for
332          *            consumers of transaction
333          *            future and listeners of transaction failure.
334          * @throws TransactionCommitFailedException
335          *             on invocation of this method.
336          *             originalCa
337          * @throws IllegalStateException
338          *             if abort failed.
339          */
340         private void abortBlocking(final TransactionCommitFailedException originalCause)
341                 throws TransactionCommitFailedException {
342             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
343             Exception cause = originalCause;
344             try {
345                 abortAsyncAll().get();
346             } catch (InterruptedException | ExecutionException e) {
347                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
348                 cause = new IllegalStateException("Abort failed.", e);
349                 cause.addSuppressed(e);
350             }
351             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
352         }
353
354         /**
355          *
356          * Invokes abort on underlying cohorts and returns future which
357          * completes
358          * once all abort on cohorts are completed.
359          *
360          * @return Future which will complete once all cohorts completed
361          *         abort.
362          *
363          */
364         private ListenableFuture<Void> abortAsyncAll() {
365             changeStateFrom(currentPhase, CommitPhase.ABORT);
366
367             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
368             int i = 0;
369             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
370                 ops[i++] = cohort.abort();
371             }
372
373             /*
374              * We are returing all futures as list, not only succeeded ones in
375              * order to fail composite future if any of them failed.
376              * See Futures.allAsList for this description.
377              */
378             @SuppressWarnings({ "unchecked", "rawtypes" })
379             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
380             return compositeResult;
381         }
382
383         /**
384          * Change phase / state of transaction from expected value to new value
385          *
386          * This method checks state and updates state to new state of
387          * of this task if current state equals expected state.
388          * If expected state and current state are different raises
389          * IllegalStateException
390          * which means there is probably bug in implementation of commit
391          * coordination.
392          *
393          * If transition is successful, it logs transition on DEBUG level.
394          *
395          * @param currentExpected
396          *            Required phase for change of state
397          * @param newState
398          *            New Phase which will be entered by transaction.
399          * @throws IllegalStateException
400          *             If currentState of task does not match expected state
401          */
402         private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
403             Preconditions.checkState(currentPhase.equals(currentExpected),
404                     "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
405                     currentPhase, newState);
406             LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
407             currentPhase = newState;
408         };
409
410     }
411
412 }