BUG-1845: implement proper shutdown sequence
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import com.google.common.base.Preconditions;
10 import com.google.common.base.Throwables;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
23 import org.opendaylight.yangtools.util.DurationStatsTracker;
24 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  *
30  * Implementation of blocking three phase commit coordinator, which which
31  * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
32  *
33  * This implementation does not support cancelation of commit,
34  *
35  * In order to advance to next phase of three phase commit all subtasks of
36  * previous step must be finish.
37  *
38  * This executor does not have an upper bound on subtask timeout.
39  *
40  *
41  */
42 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
43
44     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
45     private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
46     private final ListeningExecutorService executor;
47
48     /**
49      *
50      * Construct DOMDataCommitCoordinator which uses supplied executor to
51      * process commit coordinations.
52      *
53      * @param executor
54      */
55     public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
56         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
57     }
58
59     public DurationStatsTracker getCommitStatsTracker() {
60         return commitStatsTracker;
61     }
62
63     @Override
64     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
65             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
66         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
67         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
68         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
69
70         ListenableFuture<Void> commitFuture = null;
71         try {
72             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
73                     commitStatsTracker));
74         } catch(RejectedExecutionException e) {
75             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
76                       executor, e);
77             return Futures.immediateFailedCheckedFuture(
78                     new TransactionCommitFailedException(
79                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
80         }
81
82         return MappingCheckedFuture.create(commitFuture,
83                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
84     }
85
86     /**
87      *
88      * Phase of 3PC commit
89      *
90      * Represents phase of 3PC Commit
91      *
92      *
93      */
94     private static enum CommitPhase {
95         /**
96          *
97          * Commit Coordination Task is submitted for executing
98          *
99          */
100         SUBMITTED,
101         /**
102          * Commit Coordination Task is in can commit phase of 3PC
103          *
104          */
105         CAN_COMMIT,
106         /**
107          * Commit Coordination Task is in pre-commit phase of 3PC
108          *
109          */
110         PRE_COMMIT,
111         /**
112          * Commit Coordination Task is in commit phase of 3PC
113          *
114          */
115         COMMIT,
116         /**
117          * Commit Coordination Task is in abort phase of 3PC
118          *
119          */
120         ABORT
121     }
122
123     /**
124      * Implementation of blocking three-phase commit-coordination tasks without
125      * support of cancellation.
126      */
127     private static final class CommitCoordinationTask implements Callable<Void> {
128         private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
129                 AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
130         private final DOMDataWriteTransaction tx;
131         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
132         private final DurationStatsTracker commitStatTracker;
133         private final int cohortSize;
134         private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
135
136         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
137                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
138                 final DurationStatsTracker commitStatTracker) {
139             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
140             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
141             this.commitStatTracker = commitStatTracker;
142             this.cohortSize = Iterables.size(cohorts);
143         }
144
145         @Override
146         public Void call() throws TransactionCommitFailedException {
147             final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
148
149             try {
150                 canCommitBlocking();
151                 preCommitBlocking();
152                 commitBlocking();
153                 return null;
154             } catch (TransactionCommitFailedException e) {
155                 final CommitPhase phase = currentPhase;
156                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
157                 abortBlocking(e, phase);
158                 throw e;
159             } finally {
160                 if (commitStatTracker != null) {
161                     commitStatTracker.addDuration(System.nanoTime() - startTime);
162                 }
163             }
164         }
165
166         /**
167          *
168          * Invokes canCommit on underlying cohorts and blocks till
169          * all results are returned.
170          *
171          * Valid state transition is from SUBMITTED to CAN_COMMIT,
172          * if currentPhase is not SUBMITTED throws IllegalStateException.
173          *
174          * @throws TransactionCommitFailedException
175          *             If one of cohorts failed can Commit
176          *
177          */
178         private void canCommitBlocking() throws TransactionCommitFailedException {
179             for (ListenableFuture<?> canCommit : canCommitAll()) {
180                 try {
181                     final Boolean result = (Boolean)canCommit.get();
182                     if (result == null || !result) {
183                         throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
184                     }
185                 } catch (InterruptedException | ExecutionException e) {
186                     throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
187                 }
188             }
189         }
190
191         /**
192          *
193          * Invokes canCommit on underlying cohorts and returns composite future
194          * which will contains {@link Boolean#TRUE} only and only if
195          * all cohorts returned true.
196          *
197          * Valid state transition is from SUBMITTED to CAN_COMMIT,
198          * if currentPhase is not SUBMITTED throws IllegalStateException.
199          *
200          * @return List of all cohorts futures from can commit phase.
201          *
202          */
203         private ListenableFuture<?>[] canCommitAll() {
204             changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
205
206             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
207             int i = 0;
208             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
209                 ops[i++] = cohort.canCommit();
210             }
211             return ops;
212         }
213
214         /**
215          *
216          * Invokes preCommit on underlying cohorts and blocks till
217          * all results are returned.
218          *
219          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
220          * state is not CAN_COMMIT
221          * throws IllegalStateException.
222          *
223          * @throws TransactionCommitFailedException
224          *             If one of cohorts failed preCommit
225          *
226          */
227         private void preCommitBlocking() throws TransactionCommitFailedException {
228             final ListenableFuture<?>[] preCommitFutures = preCommitAll();
229             try {
230                 for(ListenableFuture<?> future : preCommitFutures) {
231                     future.get();
232                 }
233             } catch (InterruptedException | ExecutionException e) {
234                 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
235             }
236         }
237
238         /**
239          *
240          * Invokes preCommit on underlying cohorts and returns future
241          * which will complete once all preCommit on cohorts completed or
242          * failed.
243          *
244          *
245          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
246          * state is not CAN_COMMIT
247          * throws IllegalStateException.
248          *
249          * @return List of all cohorts futures from can commit phase.
250          *
251          */
252         private ListenableFuture<?>[] preCommitAll() {
253             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
254
255             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
256             int i = 0;
257             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
258                 ops[i++] = cohort.preCommit();
259             }
260             return ops;
261         }
262
263         /**
264          *
265          * Invokes commit on underlying cohorts and blocks till
266          * all results are returned.
267          *
268          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
269          * IllegalStateException.
270          *
271          * @throws TransactionCommitFailedException
272          *             If one of cohorts failed preCommit
273          *
274          */
275         private void commitBlocking() throws TransactionCommitFailedException {
276             final ListenableFuture<?>[] commitFutures = commitAll();
277             try {
278                 for(ListenableFuture<?> future : commitFutures) {
279                     future.get();
280                 }
281             } catch (InterruptedException | ExecutionException e) {
282                 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
283             }
284         }
285
286         /**
287          *
288          * Invokes commit on underlying cohorts and returns future which
289          * completes
290          * once all commits on cohorts are completed.
291          *
292          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
293          * IllegalStateException
294          *
295          * @return List of all cohorts futures from can commit phase.
296          *
297          */
298         private ListenableFuture<?>[] commitAll() {
299             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
300
301             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
302             int i = 0;
303             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
304                 ops[i++] = cohort.commit();
305             }
306             return ops;
307         }
308
309         /**
310          * Aborts transaction.
311          *
312          * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
313          * cohorts, blocks
314          * for all results. If any of the abort failed throws
315          * IllegalStateException,
316          * which will contains originalCause as suppressed Exception.
317          *
318          * If aborts we're successful throws supplied exception
319          *
320          * @param originalCause
321          *            Exception which should be used to fail transaction for
322          *            consumers of transaction
323          *            future and listeners of transaction failure.
324          * @param phase phase in which the problem ensued
325          * @throws TransactionCommitFailedException
326          *             on invocation of this method.
327          *             originalCa
328          * @throws IllegalStateException
329          *             if abort failed.
330          */
331         private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
332                 throws TransactionCommitFailedException {
333             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
334             Exception cause = originalCause;
335             try {
336                 abortAsyncAll(phase).get();
337             } catch (InterruptedException | ExecutionException e) {
338                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
339                 cause = new IllegalStateException("Abort failed.", e);
340                 cause.addSuppressed(e);
341             }
342             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
343         }
344
345         /**
346          * Invokes abort on underlying cohorts and returns future which
347          * completes once all abort on cohorts are completed.
348          *
349          * @param phase phase in which the problem ensued
350          * @return Future which will complete once all cohorts completed
351          *         abort.
352          */
353         private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
354             changeStateFrom(phase, CommitPhase.ABORT);
355
356             final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
357             int i = 0;
358             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
359                 ops[i++] = cohort.abort();
360             }
361
362             /*
363              * We are returning all futures as list, not only succeeded ones in
364              * order to fail composite future if any of them failed.
365              * See Futures.allAsList for this description.
366              */
367             @SuppressWarnings({ "unchecked", "rawtypes" })
368             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
369             return compositeResult;
370         }
371
372         /**
373          * Change phase / state of transaction from expected value to new value
374          *
375          * This method checks state and updates state to new state of
376          * of this task if current state equals expected state.
377          * If expected state and current state are different raises
378          * IllegalStateException
379          * which means there is probably bug in implementation of commit
380          * coordination.
381          *
382          * If transition is successful, it logs transition on DEBUG level.
383          *
384          * @param currentExpected
385          *            Required phase for change of state
386          * @param newState
387          *            New Phase which will be entered by transaction.
388          * @throws IllegalStateException
389          *             If currentState of task does not match expected state
390          */
391         private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
392             final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
393             Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
394                 tx.getIdentifier(), currentExpected, currentPhase, newState);
395
396             LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
397         };
398     }
399
400 }