Merge "Fixed for bug : 1171 - issue while creating subnet"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
8
9 import java.util.List;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutionException;
12
13 import javax.annotation.concurrent.GuardedBy;
14
15 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
16 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
17 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import com.google.common.base.Function;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Throwables;
25 import com.google.common.collect.ImmutableList;
26 import com.google.common.collect.ImmutableList.Builder;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.ListeningExecutorService;
31
32 /**
33  *
34  * Implementation of blocking three phase commit coordinator, which which
35  * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
36  *
37  * This implementation does not support cancelation of commit,
38  *
39  * In order to advance to next phase of three phase commit all subtasks of
40  * previous step must be finish.
41  *
42  * This executor does not have an upper bound on subtask timeout.
43  *
44  *
45  */
46 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
47
48     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
49
50     /**
51      * Runs AND binary operation between all booleans in supplied iteration of booleans.
52      *
53      * This method will stop evaluating iterables if first found is false.
54      */
55     private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
56
57         @Override
58         public Boolean apply(final Iterable<Boolean> input) {
59             for(boolean value : input) {
60                if(!value) {
61                    return Boolean.FALSE;
62                }
63             }
64             return Boolean.TRUE;
65         }
66     };
67
68     private final ListeningExecutorService executor;
69
70     /**
71      *
72      * Construct DOMDataCommitCoordinator which uses supplied executor to
73      * process commit coordinations.
74      *
75      * @param executor
76      */
77     public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
78         this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
79     }
80
81     @Override
82     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
83             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
84         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
85         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
86         Preconditions.checkArgument(listener != null, "Listener must not be null");
87         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
88         ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
89                 transaction, cohorts, listener));
90         if (listener.isPresent()) {
91             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
92         }
93
94         return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
95     }
96
97     /**
98      *
99      * Phase of 3PC commit
100      *
101      * Represents phase of 3PC Commit
102      *
103      *
104      */
105     private static enum CommitPhase {
106         /**
107          *
108          * Commit Coordination Task is submitted for executing
109          *
110          */
111         SUBMITTED,
112         /**
113          * Commit Coordination Task is in can commit phase of 3PC
114          *
115          */
116         CAN_COMMIT,
117         /**
118          * Commit Coordination Task is in pre-commit phase of 3PC
119          *
120          */
121         PRE_COMMIT,
122         /**
123          * Commit Coordination Task is in commit phase of 3PC
124          *
125          */
126         COMMIT,
127         /**
128          * Commit Coordination Task is in abort phase of 3PC
129          *
130          */
131         ABORT
132     }
133
134     /**
135      *
136      * Implementation of blocking three-phase commit-coordination tasks without
137      * support of cancelation.
138      *
139      */
140     private static class CommitCoordinationTask implements Callable<Void> {
141
142         private final DOMDataWriteTransaction tx;
143         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
144
145         @GuardedBy("this")
146         private CommitPhase currentPhase;
147
148         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
149                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
150                 final Optional<DOMDataCommitErrorListener> listener) {
151             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
152             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
153             this.currentPhase = CommitPhase.SUBMITTED;
154         }
155
156         @Override
157         public Void call() throws TransactionCommitFailedException {
158
159             try {
160                 canCommitBlocking();
161                 preCommitBlocking();
162                 commitBlocking();
163                 return null;
164             } catch (TransactionCommitFailedException e) {
165                 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
166                 abortBlocking(e);
167                 throw e;
168             }
169         }
170
171         /**
172          *
173          * Invokes canCommit on underlying cohorts and blocks till
174          * all results are returned.
175          *
176          * Valid state transition is from SUBMITTED to CAN_COMMIT,
177          * if currentPhase is not SUBMITTED throws IllegalStateException.
178          *
179          * @throws TransactionCommitFailedException
180          *             If one of cohorts failed can Commit
181          *
182          */
183         private void canCommitBlocking() throws TransactionCommitFailedException {
184             final Boolean canCommitResult = canCommitAll().checkedGet();
185             if (!canCommitResult) {
186                 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
187             }
188         }
189
190         /**
191          *
192          * Invokes preCommit on underlying cohorts and blocks till
193          * all results are returned.
194          *
195          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
196          * state is not CAN_COMMIT
197          * throws IllegalStateException.
198          *
199          * @throws TransactionCommitFailedException
200          *             If one of cohorts failed preCommit
201          *
202          */
203         private void preCommitBlocking() throws TransactionCommitFailedException {
204             preCommitAll().checkedGet();
205         }
206
207         /**
208          *
209          * Invokes commit on underlying cohorts and blocks till
210          * all results are returned.
211          *
212          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
213          * IllegalStateException.
214          *
215          * @throws TransactionCommitFailedException
216          *             If one of cohorts failed preCommit
217          *
218          */
219         private void commitBlocking() throws TransactionCommitFailedException {
220             commitAll().checkedGet();
221         }
222
223         /**
224          * Aborts transaction.
225          *
226          * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
227          * cohorts, blocks
228          * for all results. If any of the abort failed throws
229          * IllegalStateException,
230          * which will contains originalCause as suppressed Exception.
231          *
232          * If aborts we're successful throws supplied exception
233          *
234          * @param originalCause
235          *            Exception which should be used to fail transaction for
236          *            consumers of transaction
237          *            future and listeners of transaction failure.
238          * @throws TransactionCommitFailedException
239          *             on invocation of this method.
240          *             originalCa
241          * @throws IllegalStateException
242          *             if abort failed.
243          */
244         private void abortBlocking(final TransactionCommitFailedException originalCause)
245                 throws TransactionCommitFailedException {
246             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
247             Exception cause = originalCause;
248             try {
249                 abortAsyncAll().get();
250             } catch (InterruptedException | ExecutionException e) {
251                 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
252                 cause = new IllegalStateException("Abort failed.", e);
253                 cause.addSuppressed(e);
254             }
255             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
256         }
257
258         /**
259          *
260          * Invokes preCommit on underlying cohorts and returns future
261          * which will complete once all preCommit on cohorts completed or
262          * failed.
263          *
264          *
265          * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
266          * state is not CAN_COMMIT
267          * throws IllegalStateException.
268          *
269          * @return Future which will complete once all cohorts completed
270          *         preCommit.
271          *         Future throws TransactionCommitFailedException
272          *         If any of cohorts failed preCommit
273          *
274          */
275         private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
276             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
277             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
278             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
279                 ops.add(cohort.preCommit());
280             }
281             /*
282              * We are returing all futures as list, not only succeeded ones in
283              * order to fail composite future if any of them failed.
284              * See Futures.allAsList for this description.
285              */
286             @SuppressWarnings({ "unchecked", "rawtypes" })
287             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
288             return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
289         }
290
291         /**
292          *
293          * Invokes commit on underlying cohorts and returns future which
294          * completes
295          * once all commits on cohorts are completed.
296          *
297          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
298          * IllegalStateException
299          *
300          * @return Future which will complete once all cohorts completed
301          *         commit.
302          *         Future throws TransactionCommitFailedException
303          *         If any of cohorts failed preCommit
304          *
305          */
306         private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
307             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
308             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
309             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
310                 ops.add(cohort.commit());
311             }
312             /*
313              * We are returing all futures as list, not only succeeded ones in
314              * order to fail composite future if any of them failed.
315              * See Futures.allAsList for this description.
316              */
317             @SuppressWarnings({ "unchecked", "rawtypes" })
318             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
319             return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
320         }
321
322         /**
323          *
324          * Invokes canCommit on underlying cohorts and returns composite future
325          * which will contains {@link Boolean#TRUE} only and only if
326          * all cohorts returned true.
327          *
328          * Valid state transition is from SUBMITTED to CAN_COMMIT,
329          * if currentPhase is not SUBMITTED throws IllegalStateException.
330          *
331          * @return Future which will complete once all cohorts completed
332          *         preCommit.
333          *         Future throws TransactionCommitFailedException
334          *         If any of cohorts failed preCommit
335          *
336          */
337         private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
338             changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
339             Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
340             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
341                 canCommitOperations.add(cohort.canCommit());
342             }
343             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
344             ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
345             return Futures
346                     .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
347
348         }
349
350         /**
351          *
352          * Invokes abort on underlying cohorts and returns future which
353          * completes
354          * once all abort on cohorts are completed.
355          *
356          * @return Future which will complete once all cohorts completed
357          *         abort.
358          *
359          */
360         private ListenableFuture<Void> abortAsyncAll() {
361             changeStateFrom(currentPhase, CommitPhase.ABORT);
362             Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
363             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
364                 ops.add(cohort.abort());
365             }
366             /*
367              * We are returing all futures as list, not only succeeded ones in
368              * order to fail composite future if any of them failed.
369              * See Futures.allAsList for this description.
370              */
371             @SuppressWarnings({ "unchecked", "rawtypes" })
372             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
373             return compositeResult;
374         }
375
376         /**
377          * Change phase / state of transaction from expected value to new value
378          *
379          * This method checks state and updates state to new state of
380          * of this task if current state equals expected state.
381          * If expected state and current state are different raises
382          * IllegalStateException
383          * which means there is probably bug in implementation of commit
384          * coordination.
385          *
386          * If transition is successful, it logs transition on DEBUG level.
387          *
388          * @param currentExpected
389          *            Required phase for change of state
390          * @param newState
391          *            New Phase which will be entered by transaction.
392          * @throws IllegalStateException
393          *             If currentState of task does not match expected state
394          */
395         private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
396             Preconditions.checkState(currentPhase.equals(currentExpected),
397                     "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
398                     currentPhase, newState);
399             LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
400             currentPhase = newState;
401         };
402
403     }
404
405 }